Reputation: 2541
I have a use case where I intend to group by key(s) while aggregating over column(s). I am using Dataset and tried to achieve these operations by using groupBy
and agg
. For example take the following scenario
case class Result(deptId:String,locations:Seq[String])
case class Department(deptId:String,location:String)
// using spark 2.0.2
// I have a Dataset `ds` of type Department
+-------+--------------------+
|deptId | location |
+-------+--------------------+
| d1|delhi |
| d1|mumbai |
| dp2|calcutta |
| dp2|hyderabad |
+-------+--------------------+
I intended to convert it to
// Dataset `result` of type Result
+-------+--------------------+
|deptId | locations |
+-------+--------------------+
| d1|[delhi,mumbai] |
| dp2|[calcutta,hyderabad]|
+-------+--------------------+
For this I searched on stack and found the following:
val flatten = udf(
(xs: Seq[Seq[String]]) => xs.flatten)
val result = ds.groupBy("deptId").
agg(flatten(collect_list("location")).as("locations")
The above seemed pretty neat for me.
reduceByKey
like a RDD does. But couldn't find, so opted for above. But I read this article grouByKey vs reduceByKey and came to know reduceByKey
has less shuffles and is more efficient. Which is my first reason to ask the question, should I opt for RDD in my scenario ?Department
. But as my result has an entirely different schema should I bother with type safety ? So I tried doing result.as[Result]
but that doesn't seem to do any compile time type check. Another reason I chose Dataset was, I'll pass the result Dataset to some other function, having a structure makes code easy to maintain. Also the case class can be highly nested, I cannot imagine maintaining that nesting in pairRDD while writing reduce/map operations.udf
. I came across post, where people said they would prefer changing Dataset to RDD, rather than using udf for complex aggregations/grouby. PS: please forgive, if I used some terms incorrectly.
Upvotes: 1
Views: 134
Reputation: 56
To answer some of you questions:
groupBy
+ agg
is not groupByKey
- DataFrame / Dataset groupBy behaviour/optimization - in general case. There are specific cases where it might behave like one, this includes collect_list
.reduceByKey
is not better than RDD
-style groupByKey
when groupByKey
-like logic is required - Be Smart About groupByKey - and in fact it is almost always worse.
There is a important trade-off between static type checking and performance in Spark's Dataset
- Spark 2.0 Dataset vs DataFrame
The linked post specifically advises against using UserDefinedAggregateFunction
(not UserDefinedFunction
) because of excessive copying of data - Spark UDAF with ArrayType as bufferSchema performance issues
You don't even need UserDefinedFunction
as flattening is not required in your case:
val df = Seq[Department]().toDF
df.groupBy("deptId").agg(collect_list("location").as("locations"))
And this is what you should go for.
A statically typed equivalent would be
val ds = Seq[Department]().toDS
ds
.groupByKey(_.deptId)
.mapGroups { case (deptId, xs) => Result(deptId, xs.map(_.location).toSeq) }
considerably more expensive than the DataFrame
option.
Upvotes: 4