Reputation: 40508
There is some scary language in the docs of groupByKey
, warning that it can be "very expensive", and suggesting to use aggregateByKey
instead whenever possible.
I am wondering whether the difference in cost comes from the fact, that for some aggregattions, the entire group never never needs to be collected and loaded to the same node, or if there are other differences in implementation.
Basically, the question is whether rdd.groupByKey()
would be equivalent to rdd.aggregateByKey(Nil)(_ :+ _, _ ++ _)
or if it would still be more expensive.
Upvotes: 4
Views: 3376
Reputation: 2959
If you are reducing to single element instead of list.
For eg: like word count then aggregateByKey performs better because it will not cause shuffle as explained in the link performance of group by vs aggregate by.
But in your case you are merging to a list . In the case of aggregateByKey it will first reduce all the values for a key in a partition to a single list and then send the data for shuffle.This will create as many list as partitions and memory for that will be high.
In the case of groupByKey the merge happens only at one node responsible for the key. The number of list created will be only one per key here. In case of merging to list then groupByKey is optimal in terms of memory.
Also Refer: SO Answer by zero323
I am not sure about your use case. But if you can limit the number of elements in the list in the end result then certainly aggregateByKey / combineByKey will give much better result compared to groupByKey. For eg: If you want to take only top 10 values for a given key. Then you could achieve this efficiently by using combineByKey with proper merge and combiner functions
than
groupByKey and take 10.
Upvotes: 6
Reputation: 1410
Let me help to illustrate why groupByKey operation will lead to much more cost
By understanding the semantic of this specific operation, what the reduce task need to do is group the whole values associated with a single unique key.
In a word, let us have a look at it's signature
def groupByKey(): RDD[(K, Iterable[V])]
Because the "groupby" operation, all values associated with this key partitioned on different nodes can not be pre-merged. Huge mount of data transfer through over the network, lead to high network io load.
But aggregateByKey is not the same with it. let me clarify the signature:
def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]
How the spark engine implement this semantic of operation is as follows:
In partition it will have pre-merged operation, mean that "a specific reducer" just need to fetch all the pre-merged intermediate result of the shuffle map.
This will make the network io significantly light.
Upvotes: -1