Matt
Matt

Reputation: 793

Replace groupByKey with reduceByKey in Spark

Hello I often need to use groupByKey in my code but I know it's a very heavy operation. Since I'm working to improve performance I was wondering if my approach to remove all groupByKey calls is efficient.

I was used to create an RDD from another RDD and creating pair of type (Int, Int)

rdd1 = [(1, 2), (1, 3), (2 , 3), (2, 4), (3, 5)]

and since I needed to obtain something like this:

[(1, [2, 3]), (2 , [3, 4]), (3, [5])]

what I used was out = rdd1.groupByKey but since this approach might be very problematic with huge datasets I thought to use this solution:

Instead of creating my RDD rdd1 of pairs of type (Int, Int) what I do is creating it of pairs of type (Int, List[Int]) so my rdd1 was something like this

rdd1 = [(1, [2]), (1, [3]), (2 , [3]), (2, [4]), (3, [5])]

but this time to reach the same result I used reduceByKey(_ ::: _) joining all the values by key, which is supposed to be faster. Do you think using this approach might improve performance? I'm afraid of this type (Int, List[Int]) isn't stupid creating a pair which value is a list containing only 1 element?

Do you think is there a faster way to reach the same result, using some other method? Thank you.

Upvotes: 1

Views: 3530

Answers (3)

Jagannath
Jagannath

Reputation: 4025

May be a bit late to answer this. It might help others.

val tuples = List((1, 2), (1, 3), (2 , 3), (2, 4), (3, 5))
val context = getContext() // get Spark Context.
val tuplesRDD = context.parallelize(tuples)

val list = mutable.MutableList.empty[Int]
val addItemsToList = (s: mutable.MutableList[Int], v : Int) => s +=  v
val mergeLists = (x: mutable.MutableList[Int], 
                  y: mutable.MutableList[Int]) => x ++= y

val groupByKey = tuplesRDD.aggregateByKey(list)(addItemsToList, mergeLists)
groupByKey.cache()
groupByKey.foreach(x => println(x))

Output

(1,MutableList(2, 3))
(2,MutableList(3, 4))
(3,MutableList(5))

Upvotes: 2

Cyrille Corpet
Cyrille Corpet

Reputation: 5305

I don't think you should use reduceByKey if your end result is to be

[(1, [2, 3]), (2 , [3, 4]), (3, [5])]

Why? Because this is what groupByKey is made for, so it probably does it best.

The problem with groupByKey is that you usually don't need a list (or an array) of all values with the same key, but something you can obtain from this list. If you don't really need the list, you probably can do the reduction in the same step as the shuffle, using reduceByKey.

The two advantages of reduceByKey:

  • it can start reduction before shuffling (reduce values that are on the same executor, to avoid unnecessary network payload)
  • it never loads the whole array of values with the same key into memory. This is important in huge datasets, where the array may be several GB large.

In your case, as you presented it, the first point is not very important (since there is no real reduction of the data, just concatenation), the second point does not apply since you want the whole list.

However, I strongly suggest that you think about if you really need the whole list, or if this is just a step in your computation, especially if you're working with large datasets.

Upvotes: 3

Jacek Laskowski
Jacek Laskowski

Reputation: 74759

Since I'm working to improve performance I was wondering if my approach to remove all groupByKey calls is efficient.

Check out RDD.toDebugString to see the logical plan of your RDD transformation. That should give you pretty good overview of how fast (or not) your action is going to be.

Avoid ShuffledRDD since they incur shuffle operation that is usually very expensive.

As to your idea of using reduceByKey consider keyBy it before, e.g.

rdd.keyBy(_.kind).reduceByKey(....)

You may also consider aggregateByKey as the most general transformation (that sits behind groupBy and relatives).

And the last but not least, groupBy has two variants that allow for defining number of partitions or Partitioner. These can avoid expensive shuffle.

Read up on org.apache.spark.rdd.PairRDDFunctions.

Use web UI to have a better insight on the performance of your "queries". Knowing your data will help a lot. Spend enough time on it too (as the time to optimize your query may be wasted otherwise).

Upvotes: 3

Related Questions