Reputation: 331
I have the following schema and student records of a spark dataset.
id | name | subject | score
1 | Tom | Math | 99
1 | Tom | Math | 88
1 | Tom | Physics | 77
2 | Amy | Math | 66
My goal is to transfer this dataset into another one which shows a list of record of the highest score of every subject for all students
id | name | subject_score_list
1 | Tom | [(Math, 99), (Physics, 77)]
2 | Amy | [(Math, 66)]
I've decided to use an Aggregator
to do the transformation after transforming this dataset into ((id, name), (subject score))
key-value pair.
For the buffer I tried to use a mutable Map[String, Integer]
so I can update the score if the subject exists and the new score is higher. Here's how the aggregator looks like
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.expressions.Aggregator
type StudentSubjectPair = ((String, String), (String, Integer))
type SubjectMap = collection.mutable.Map[String, Integer]
type SubjectList = List[(String, Integer)]
val StudentSubjectAggregator = new Aggregator[StudentSubjectPair, SubjectMap, SubjectList] {
def zero: SubjectMap = collection.mutable.Map[String, Integer]()
def reduce(buf: SubjectMap, input: StudentSubjectPair): SubjectMap = {
if (buf.contains(input._2._1))
buf.map{ case (input._2._1, score) => input._2._1 -> math.max(score, input._2._2) }
else
buf(input._2._1) = input._2._2
buf
}
def merge(b1: SubjectMap, b2: SubjectMap): SubjectMap = {
for ((subject, score) <- b2) {
if (b1.contains(subject))
b1(subject) = math.max(score, b2(subject))
else
b1(subject) = score
}
b1
}
def finish(buf: SubjectMap): SubjectList = buf.toList
override def bufferEncoder: Encoder[SubjectMap] = ExpressionEncoder[SubjectMap]
override def outputEncoder: Encoder[SubjectList] = ExpressionEncoder[SubjectList]
}.toColumn.name("subject_score_list")
I use Aggregator
because I found it customizable and if I want to find the mean score of a subject I can simply change the reduce
and merge
functions.
I'm expecting two answers for this post.
Aggregator
to get this job done? Are there any other simple way to get the same output?collection.mutable.Map[String, Integer]
and List[(String, Integer)]
since I always get the following errororg.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 37.0 failed 1 times, most recent failure: Lost task 0.0 in stage 37.0 (TID 231, localhost, executor driver):
java.lang.ClassCastException: scala.collection.immutable.HashMap$HashTrieMap cannot be cast to scala.collection.mutable.Map
at $anon$1.merge(<console>:54)
Appreciate for any input and help, thanks!
Upvotes: 1
Views: 428
Reputation: 1114
I think you can achieve your desired result with the DataFrame API.
val df= Seq((1 ,"Tom" ,"Math",99),
(1 ,"Tom" ,"Math" ,88),
(1 ,"Tom" ,"Physics" ,77),
(2 ,"Amy" ,"Math" ,66)).toDF("id", "name", "subject","score")
GroupBy on id, name,and subject for max score, followed by a groupBy on id,name with a collect_list on map of subject,score
df.groupBy("id","name", "subject").agg(max("score").as("score")).groupBy("id","name").
agg(collect_list(map($"subject",$"score")).as("subject_score_list"))
+---+----+--------------------+
| id|name| subject_score_list|
+---+----+--------------------+
| 1| Tom|[[Physics -> 77],...|
| 2| Amy| [[Math -> 66]]|
+---+----+--------------------+
Upvotes: 2