Vishal
Vishal

Reputation: 1492

Dynamically query spark sql dataframe with complex type

I am trying to query the spark sql dataframe with complex type, where the function should itself be able to create and expression to generate the column dataframe for nested complex datatypes. say

case class SubRecord(x: Int)
case class ArrayElement(foo: String, bar: Int, vals: Array[Double])
case class Record(
  an_array: Array[Int], a_map: Map[String, String], 
  a_struct: SubRecord, an_array_of_structs: Array[ArrayElement])

val df = sc.parallelize(Seq(
  Record(Array(1, 2, 3), Map("foo" -> "bar"), SubRecord(1),
         Array(
           ArrayElement("foo", 1, Array(1.0, 2.0)),
           ArrayElement("bar", 2, Array(3.0, 4.0)))),
  Record(Array(4, 5, 6), Map("foz" -> "baz"), SubRecord(2),
         Array(ArrayElement("foz", 3, Array(5.0, 6.0)), 
               ArrayElement("baz", 4, Array(7.0, 8.0))))
)).toDF

referred from Querying Spark SQL DataFrame with complex types

for extracting the map type query could be

df.select($"a_map.foo").show

now if I have

case class Record(
  an_array: Array[Int], a_map_new: Map[String, Array[ArrayElement]], 
  a_struct: SubRecord, an_array_of_structs: Array[ArrayElement])

instead of Map[String,String] , how to create a udf that takes the name or index in case of array and generates the result for that nested element in complex datatype. say suppose now i want to query on the vals[0] contained in a_map_new.

Upvotes: 1

Views: 862

Answers (1)

Alper t. Turker
Alper t. Turker

Reputation: 35249

In this case, where you have well defined record types, I'd recommend using strongly typed Dataset:

val result = df.as[Record].map(_.a_map_new.mapValues(_.headOption))

result.printSchema
// root
//  |-- value: map (nullable = true)
//  |    |-- key: string
//  |    |-- value: struct (valueContainsNull = true)
//  |    |    |-- foo: string (nullable = true)
//  |    |    |-- bar: integer (nullable = false)
//  |    |    |-- vals: array (nullable = true)
//  |    |    |    |-- element: double (containsNull = false)

With udf the obstacle is it's asymmetric nature:

  • It gets internal type Row.
  • Should return external type.
  • Has to have statically defined return type.

Any generic solution, which would return struct, would have know how to map to external types. I guess you could design something like this (pseudocode)

def f(mapper: Row => T) = udf((map: Map[U, Row]) => map.mapValues(mapper(_)))

Upvotes: 1

Related Questions