Reputation: 1463
Can anyone explain the difference between reducebykey
, groupbykey
, aggregatebykey
and combinebykey
? I have read the documents regarding this, but couldn't understand the exact differences.
An explanation with examples would be great.
Upvotes: 103
Views: 153206
Reputation: 1641
groupByKey:
Syntax:
sparkContext.textFile("hdfs://")
.flatMap(line => line.split(" ") )
.map(word => (word,1))
.groupByKey()
.map((x,y) => (x,sum(y)))
groupByKey
can cause out of disk problems as data is sent over the network and collected on the reduced workers.
reduceByKey:
Syntax:
sparkContext.textFile("hdfs://")
.flatMap(line => line.split(" "))
.map(word => (word,1))
.reduceByKey((x,y)=> (x+y))
Data are combined at each partition, with only one output for one key at each partition to send over the network. reduceByKey
required combining all your values into another value with the exact same type.
aggregateByKey:
same as reduceByKey
, which takes an initial value.
3 parameters as input
Example:
val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D")
val data = sc.parallelize(keysWithValuesList)
//Create key value pairs
val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache()
val initialCount = 0;
val addToCounts = (n: Int, v: String) => n + 1
val sumPartitionCounts = (p1: Int, p2: Int) => p1 + p2
val countByKey = kv.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts)
ouput: Aggregate By Key sum Results bar -> 3 foo -> 5
combineByKey:
3 parameters as input
aggregateByKey
, need not pass constant always, we can pass a function that will return a new value.Example:
val result = rdd.combineByKey(
(v) => (v,1),
( (acc:(Int,Int),v) => acc._1 +v , acc._2 +1 ) ,
( acc1:(Int,Int),acc2:(Int,Int) => (acc1._1+acc2._1) , (acc1._2+acc2._2))
).map( { case (k,v) => (k,v._1/v._2.toDouble) })
result.collect.foreach(println)
reduceByKey
,aggregateByKey
,combineByKey
preferred over groupByKey
Reference: Avoid groupByKey
Upvotes: 115
Reputation: 5710
groupByKey()
is just to group your dataset based on a key. It will result in data shuffling when RDD is not already partitioned.reduceByKey()
is something like grouping + aggregation. We can say reduceByKey()
equivalent to dataset.group(...).reduce(...). It will shuffle less data unlike groupByKey()
.aggregateByKey()
is logically same as reduceByKey()
but it lets you return result in different type. In another words, it lets you have an input as type x and aggregate result as type y. For example (1,2),(1,4) as input and (1,"six") as output. It also takes zero-value that will be applied at the beginning of each key.Note: One similarity is they all are wide operations.
Upvotes: 31
Reputation: 229
Then apart from these 4, we have
foldByKey which is same as reduceByKey but with a user defined Zero Value.
AggregateByKey takes 3 parameters as input and uses 2 functions for merging(one for merging on same partitions and another to merge values across partition. The first parameter is ZeroValue)
whereas
ReduceBykey takes 1 parameter only which is a function for merging.
CombineByKey takes 3 parameter and all 3 are functions. Similar to aggregateBykey except it can have a function for ZeroValue.
GroupByKey takes no parameter and groups everything. Also, it is an overhead for data transfer across partitions.
Upvotes: 2
Reputation: 946
Although both of them will fetch the same results, there is a significant difference in the performance of both the functions. reduceByKey()
works better with larger datasets when compared to groupByKey()
.
In reduceByKey()
, pairs on the same machine with the same key are combined (by using the function passed into reduceByKey()
) before the data is shuffled. Then the function is called again to reduce all the values from each partition to produce one final result.
In groupByKey()
, all the key-value pairs are shuffled around. This is a lot of unnecessary data to being transferred over the network.
Upvotes: 12
Reputation: 3780
ReduceByKey reduceByKey(func, [numTasks])
-
Data is combined so that at each partition there should be at least one value for each key. And then shuffle happens and it is sent over the network to some particular executor for some action such as reduce.
GroupByKey - groupByKey([numTasks])
It doesn't merge the values for the key but directly the shuffle process happens and here lot of data gets sent to each partition, almost same as the initial data.
And the merging of values for each key is done after the shuffle. Here lot of data stored on final worker node so resulting in out of memory issue.
AggregateByKey - aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
It is similar to reduceByKey but you can provide initial values when performing aggregation.
Use of reduceByKey
reduceByKey
can be used when we run on large data set.
reduceByKey
when the input and output value types are of same type
over aggregateByKey
Moreover it recommended not to use groupByKey
and prefer reduceByKey
. For details you can refer here.
You can also refer this question to understand in more detail how reduceByKey
and aggregateByKey
.
Upvotes: 7
Reputation: 1959
While both reducebykey and groupbykey will produce the same answer, the reduceByKey example works much better on a large dataset. That's because Spark knows it can combine output with a common key on each partition before shuffling the data.
On the other hand, when calling groupByKey - all the key-value pairs are shuffled around. This is a lot of unnessary data to being transferred over the network.
for more detailed check this below link
Upvotes: 19