TheGreenHeptagon
TheGreenHeptagon

Reputation: 75

reduceByKey with case class instance as the key

I'm working on AWS EMR with Spark version 2.4.7-amzn-1, using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_302).

I wanted to reduce a dataset of my custom case class Item by key, where the key itself is a custom case class. However, the reduceByKey did not work as I expected.

Here are the two classes:

case class Key(
  name: String,
  color: String
)

case class Item(
  name: String,
  color: String,
  count: Int
) {
    def key: Key = Key(name, color)
}

To aggregate, I defined a custom combine function in Item's companion object that just adds up the counts:

object Item {
  def combine(i1: Item, i2: Item): Item = i1.copy(count = i1.count + i2.count)
}

Here's my aggregate function:

import org.apache.spark.sql.Dataset
import spark.implicits._

def aggregate(items: Dataset[Item]): Dataset[Item] = items
  .rdd
  .keyBy(_.key)
  .reduceByKey(Item.combine)
  .map(_._2)
  .toDS

Now if I try to aggregate...

val items: Dataset[Item] = spark.sparkContext.parallelize(
  Seq(
    Item("Square", "green", 8),
    Item("Triangle", "blue", 3),
    Item("Square", "green", 5),
    Item("Triangle", "blue", 7)
  )
).toDS

val aggregated: Dataset[Item] = aggregate(items)

aggregated.show

...the output shows that the dataset has not been reduced:

+--------+-----+-----+
|    name|color|count|
+--------+-----+-----+
|  Square|green|    8|
|  Square|green|    5|
|Triangle| blue|    3|
|Triangle| blue|    7|
+--------+-----+-----+

However, I observed that the aggregation did work, when I changed the order of the 4 items in the sequence, so the outcome is not consistent.

If I change the key from being a case class instance

def key: Key = Key(name, color)

into being a tuple

def key: Tuple2[String, String] = (name, color)

the aggregation works as expected, giving this output:

+--------+-----+-----+
|    name|color|count|
+--------+-----+-----+
|  Square|green|   13|
|Triangle| blue|   10|
+--------+-----+-----+

So, does reduceByKey in general not (reliably) work with case classes? Is this the expected behavior? Or has this nothing to do with case class vs. tuple and the real cause lies hidden somewhere else? My Key class seems quite simple to me, so I guess, it's not a hashing or comparing issue. (I could be wrong.)

I also looked at this question reduceByKey using Scala object as key, but there the cause turned out to be a typo, and chrisbtk explicitly stated: "Spark knows how to compare two object even if they do not implement Ordered."

Do I always have to use tuples as keys?

Upvotes: 3

Views: 367

Answers (1)

gatear
gatear

Reputation: 946

Try using the Dataset API directly:

Having:

import sparkSession.implicits._
import org.apache.spark.sql.Encoders

implicit val key: Encoder[Key] = Encoders.product[Key]

You can do:

items
  .groupByKey(_.key)
  .reduceGroups(Item.combine)
  .map(_._2)

Upvotes: 1

Related Questions