Reputation: 111
I need to do a transformation on a selection of columns of a data frame which has a nested structure. The transformation relies on a function that is already present.
Suppose the data looks as follows
case class A(A: B)
case class B(B: String, C: String, D: Seq[C])
case class C(E: String, F: String)
val df = sc.parallelize(Seq(A(B("b", "c", Seq(C("e1","f1"), C("e2", "f2")))) )).toDF
df.printSchema
root
|-- A: struct (nullable = true)
| |-- B: string (nullable = true)
| |-- C: string (nullable = true)
| |-- D: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- E: string (nullable = true)
| | | |-- F: string (nullable = true)
and suppose the transformation is turning a string to its upper case
val upper: String => String = _.toUpperCase
val upperUDF = udf(upper)
Here I found an approach that partially solves my problem. Applying the code given there
def mutate(df: DataFrame, fn: Column => Column): DataFrame = {
// Get a projection with fields mutated by `fn` and select it
// out of the original frame with the schema reassigned to the original
// frame (explained later)
df.sqlContext.createDataFrame(df.select(traverse(df.schema, fn):_*).rdd, df.schema)
}
def traverse(schema: StructType, fn: Column => Column, path: String = ""): Array[Column] = {
schema.fields.map(f => {
f.dataType match {
case s: StructType => struct(traverse(s, fn, path + f.name + "."): _*)
case _ => fn(col(path + f.name))
}
})
}
the following works fine for me
val df2 = mutate(df, c => if (c.toString == "A.B" || c.toString == "A.C") upperUDF(c) else c)
However, when it comes to the transformation of the columns of the nested array D it fails without error.
val df3 = mutate(df, c => if (c.toString == "A.D.F") upperUDF(c) else c)
What is going wrong here? How can I transform columns of a nested array as described above?
Upvotes: 0
Views: 1144