Reputation: 1492
I am trying to query the spark sql dataframe with complex type, where the function should itself be able to create and expression to generate the column dataframe for nested complex datatypes. say
case class SubRecord(x: Int)
case class ArrayElement(foo: String, bar: Int, vals: Array[Double])
case class Record(
an_array: Array[Int], a_map: Map[String, String],
a_struct: SubRecord, an_array_of_structs: Array[ArrayElement])
val df = sc.parallelize(Seq(
Record(Array(1, 2, 3), Map("foo" -> "bar"), SubRecord(1),
Array(
ArrayElement("foo", 1, Array(1.0, 2.0)),
ArrayElement("bar", 2, Array(3.0, 4.0)))),
Record(Array(4, 5, 6), Map("foz" -> "baz"), SubRecord(2),
Array(ArrayElement("foz", 3, Array(5.0, 6.0)),
ArrayElement("baz", 4, Array(7.0, 8.0))))
)).toDF
referred from Querying Spark SQL DataFrame with complex types
for extracting the map type query could be
df.select($"a_map.foo").show
now if I have
case class Record(
an_array: Array[Int], a_map_new: Map[String, Array[ArrayElement]],
a_struct: SubRecord, an_array_of_structs: Array[ArrayElement])
instead of Map[String,String] , how to create a udf that takes the name or index in case of array and generates the result for that nested element in complex datatype.
say suppose now i want to query on the vals[0] contained in a_map_new
.
Upvotes: 1
Views: 862
Reputation: 35249
In this case, where you have well defined record types, I'd recommend using strongly typed Dataset
:
val result = df.as[Record].map(_.a_map_new.mapValues(_.headOption))
result.printSchema
// root
// |-- value: map (nullable = true)
// | |-- key: string
// | |-- value: struct (valueContainsNull = true)
// | | |-- foo: string (nullable = true)
// | | |-- bar: integer (nullable = false)
// | | |-- vals: array (nullable = true)
// | | | |-- element: double (containsNull = false)
With udf
the obstacle is it's asymmetric nature:
Row
.Any generic solution, which would return struct, would have know how to map
to external types. I guess you could design something like this (pseudocode)
def f(mapper: Row => T) = udf((map: Map[U, Row]) => map.mapValues(mapper(_)))
Upvotes: 1