Hagai
Hagai

Reputation: 275

Spark can not get elements from 'MapType'

I'm writing a udaf with the following buffer schema:

bufferSchema: StructType = StructType(
    StructField("grades", MapType(StructType(StructField("subject", StringType) :: StructField("subject_type", StringType) :: Nil),
      ArrayType(StructType(StructField("date", LongType) :: StructField("grade", IntegerType) :: Nil)))) :: Nil)

It looks like internally spark interprets the key type to be of GenericRowWithSchema instead of simple (String,String). so Whenever I try to pull from the map :

  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {

var buffer_scoresMap = buffer.getAs[Map[(String,String), Array[..]](0)

buffer_scoresMap.get(("k1","k2")) returns None even though that this key is definitely in the map, I even see it in debug. I tried to mutate the keys to GenericRowWithSchema and then back to (String,String) and then get from the map, with no luck.

Any idea?

Upvotes: 0

Views: 749

Answers (1)

Tzach Zohar
Tzach Zohar

Reputation: 37832

Indeed, tuples are converted into Structs and not converted back into tuples when they are part of a deeply-nested column. In other words, buffer_scoresMap actually has the type Map[Row, Array[..]], so you can create a Row to fetch items from it:

var buffer_scoresMap = buffer.getAs[Map[Row, Array[..]](0)
buffer_scoresMap.get(Row("k1","k2")) // should not be None if key exists

Here's a short example that proves this:

// create a simple DF with similar schema: 
case class Record(grades: Map[(String, String), Array[Int]])
val df = sc.parallelize(Seq(Record(Map(("a", "b") -> Array(1, 2))))).toDF("grades")

// this indeed fails:
df.rdd.map(r => r.getAs[Map[(String, String), Array[Int]]](0).get(("a", "b"))).first() // None

// but this works:
df.rdd.map(r => r.getAs[Map[Row, Array[Int]]](0).get(Row("a", "b"))).first() // Some(WrappedArray(1, 2))

Upvotes: 1

Related Questions