Reputation: 275
I'm writing a udaf with the following buffer schema:
bufferSchema: StructType = StructType(
StructField("grades", MapType(StructType(StructField("subject", StringType) :: StructField("subject_type", StringType) :: Nil),
ArrayType(StructType(StructField("date", LongType) :: StructField("grade", IntegerType) :: Nil)))) :: Nil)
It looks like internally spark interprets the key type to be of GenericRowWithSchema instead of simple (String,String). so Whenever I try to pull from the map :
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
var buffer_scoresMap = buffer.getAs[Map[(String,String), Array[..]](0)
buffer_scoresMap.get(("k1","k2"))
returns None even though that this key is definitely in the map, I even see it in debug.
I tried to mutate the keys to GenericRowWithSchema
and then back to (String,String)
and then get from the map, with no luck.
Any idea?
Upvotes: 0
Views: 749
Reputation: 37832
Indeed, tuples are converted into Structs and not converted back into tuples when they are part of a deeply-nested column. In other words, buffer_scoresMap
actually has the type Map[Row, Array[..]]
, so you can create a Row
to fetch items from it:
var buffer_scoresMap = buffer.getAs[Map[Row, Array[..]](0)
buffer_scoresMap.get(Row("k1","k2")) // should not be None if key exists
Here's a short example that proves this:
// create a simple DF with similar schema:
case class Record(grades: Map[(String, String), Array[Int]])
val df = sc.parallelize(Seq(Record(Map(("a", "b") -> Array(1, 2))))).toDF("grades")
// this indeed fails:
df.rdd.map(r => r.getAs[Map[(String, String), Array[Int]]](0).get(("a", "b"))).first() // None
// but this works:
df.rdd.map(r => r.getAs[Map[Row, Array[Int]]](0).get(Row("a", "b"))).first() // Some(WrappedArray(1, 2))
Upvotes: 1