Reputation: 75
I'm working on AWS EMR with Spark version 2.4.7-amzn-1, using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_302).
I wanted to reduce a dataset of my custom case class Item
by key, where the key itself is a custom case class. However, the reduceByKey
did not work as I expected.
Here are the two classes:
case class Key(
name: String,
color: String
)
case class Item(
name: String,
color: String,
count: Int
) {
def key: Key = Key(name, color)
}
To aggregate, I defined a custom combine function in Item
's companion object that just adds up the counts:
object Item {
def combine(i1: Item, i2: Item): Item = i1.copy(count = i1.count + i2.count)
}
Here's my aggregate function:
import org.apache.spark.sql.Dataset
import spark.implicits._
def aggregate(items: Dataset[Item]): Dataset[Item] = items
.rdd
.keyBy(_.key)
.reduceByKey(Item.combine)
.map(_._2)
.toDS
Now if I try to aggregate...
val items: Dataset[Item] = spark.sparkContext.parallelize(
Seq(
Item("Square", "green", 8),
Item("Triangle", "blue", 3),
Item("Square", "green", 5),
Item("Triangle", "blue", 7)
)
).toDS
val aggregated: Dataset[Item] = aggregate(items)
aggregated.show
...the output shows that the dataset has not been reduced:
+--------+-----+-----+
| name|color|count|
+--------+-----+-----+
| Square|green| 8|
| Square|green| 5|
|Triangle| blue| 3|
|Triangle| blue| 7|
+--------+-----+-----+
However, I observed that the aggregation did work, when I changed the order of the 4 items in the sequence, so the outcome is not consistent.
If I change the key from being a case class instance
def key: Key = Key(name, color)
into being a tuple
def key: Tuple2[String, String] = (name, color)
the aggregation works as expected, giving this output:
+--------+-----+-----+
| name|color|count|
+--------+-----+-----+
| Square|green| 13|
|Triangle| blue| 10|
+--------+-----+-----+
So, does reduceByKey
in general not (reliably) work with case classes? Is this the expected behavior? Or has this nothing to do with case class vs. tuple and the real cause lies hidden somewhere else?
My Key
class seems quite simple to me, so I guess, it's not a hashing or comparing issue. (I could be wrong.)
I also looked at this question reduceByKey using Scala object as key, but there the cause turned out to be a typo, and chrisbtk explicitly stated: "Spark knows how to compare two object even if they do not implement Ordered."
Do I always have to use tuples as keys?
Upvotes: 3
Views: 367
Reputation: 946
Try using the Dataset API directly:
Having:
import sparkSession.implicits._
import org.apache.spark.sql.Encoders
implicit val key: Encoder[Key] = Encoders.product[Key]
You can do:
items
.groupByKey(_.key)
.reduceGroups(Item.combine)
.map(_._2)
Upvotes: 1