Bagel912
Bagel912

Reputation: 331

How to apply customizable Aggregator on Spark Dataset?

I have the following schema and student records of a spark dataset.

id | name | subject | score
1  | Tom  | Math    | 99
1  | Tom  | Math    | 88
1  | Tom  | Physics | 77
2  | Amy  | Math    | 66

My goal is to transfer this dataset into another one which shows a list of record of the highest score of every subject for all students

id | name | subject_score_list
1  | Tom  | [(Math, 99), (Physics, 77)]
2  | Amy  | [(Math, 66)]

I've decided to use an Aggregator to do the transformation after transforming this dataset into ((id, name), (subject score)) key-value pair.

For the buffer I tried to use a mutable Map[String, Integer] so I can update the score if the subject exists and the new score is higher. Here's how the aggregator looks like

import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.expressions.Aggregator

type StudentSubjectPair = ((String, String), (String, Integer))
type SubjectMap = collection.mutable.Map[String, Integer]
type SubjectList = List[(String, Integer)]

val StudentSubjectAggregator = new Aggregator[StudentSubjectPair, SubjectMap, SubjectList] {
  def zero: SubjectMap = collection.mutable.Map[String, Integer]()

  def reduce(buf: SubjectMap, input: StudentSubjectPair): SubjectMap = {
    if (buf.contains(input._2._1))
      buf.map{ case (input._2._1, score) => input._2._1 -> math.max(score, input._2._2) }
    else
      buf(input._2._1) = input._2._2
    buf
  }

  def merge(b1: SubjectMap, b2: SubjectMap): SubjectMap = {
    for ((subject, score) <- b2) {
      if (b1.contains(subject))
        b1(subject) = math.max(score, b2(subject))
      else
        b1(subject) = score
    }
    b1
  }

  def finish(buf: SubjectMap): SubjectList = buf.toList

  override def bufferEncoder: Encoder[SubjectMap] = ExpressionEncoder[SubjectMap]
  override def outputEncoder: Encoder[SubjectList] = ExpressionEncoder[SubjectList]
}.toColumn.name("subject_score_list")

I use Aggregator because I found it customizable and if I want to find the mean score of a subject I can simply change the reduce and merge functions. I'm expecting two answers for this post.

  1. Is it a good way to use Aggregator to get this job done? Are there any other simple way to get the same output?
  2. What's the correct encoder for collection.mutable.Map[String, Integer] and List[(String, Integer)] since I always get the following error
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 37.0 failed 1 times, most recent failure: Lost task 0.0 in stage 37.0 (TID 231, localhost, executor driver):
java.lang.ClassCastException: scala.collection.immutable.HashMap$HashTrieMap cannot be cast to scala.collection.mutable.Map
    at $anon$1.merge(<console>:54)

Appreciate for any input and help, thanks!

Upvotes: 1

Views: 428

Answers (1)

mikeL
mikeL

Reputation: 1114

I think you can achieve your desired result with the DataFrame API.

val df= Seq((1 ,"Tom" ,"Math",99),
    (1 ,"Tom" ,"Math" ,88),
    (1 ,"Tom" ,"Physics" ,77),
    (2 ,"Amy" ,"Math"  ,66)).toDF("id", "name", "subject","score")

GroupBy on id, name,and subject for max score, followed by a groupBy on id,name with a collect_list on map of subject,score

df.groupBy("id","name", "subject").agg(max("score").as("score")).groupBy("id","name").
    agg(collect_list(map($"subject",$"score")).as("subject_score_list"))


+---+----+--------------------+
| id|name|  subject_score_list|
+---+----+--------------------+
|  1| Tom|[[Physics -> 77],...|
|  2| Amy|      [[Math -> 66]]|
+---+----+--------------------+

Upvotes: 2

Related Questions