Sourav Gulati
Sourav Gulati

Reputation: 1359

Apache Spark How aggregateby key works

I was playing with Spark. I am bit confused with the working of aggregateby key function.

If I provide non-zero initial value. It is adding 2*initial value in the total.

Following is the code snippet:

JavaPairRDD<String, Integer> mapToPair = rdd.mapToPair(message -> new Tuple2<String, Integer>(message.split(",")[0], Integer.parseInt(message.split(",")[1])))

Function2<Integer, Integer, Integer>  mergeValue =(v1, v2) -> v1+v2; Function2<Integer, Integer, Integer>  mergeCombiners =(v1, v2) -> v1+v2;

JavaPairRDD<String, Integer> aggregateByKey = mapToPair.aggregateByKey(1, mergeValue, mergeCombiners);

System.out.println("Aggregate by key "+ aggregateByKey.collect());

Following is my input rdd:

hello,1
hello,1
hello,1
hello,1

Output I am getting is

Aggregate by key [(hello,6)]

Please explain its working

Upvotes: 1

Views: 213

Answers (2)

Aditya
Aditya

Reputation: 2415

I agree with @LostInOverflow and here is the explanation why Spark has a zeroValue as first arugment in place in aggregateByKey:

Both 'merging values within a partition' (argument 2) and 'merging values betweeen partitions' (argument 3) functions read and update the first argument (zeroValue) and return it instead of creating a new return value to avoid extra memory allocation. This could be negligible for small scale operations but will be a memory saving technique for a very large scale operations running on cluster(s) with hundreds of nodes

Hence it will be an arbitrary value chosen based on the kind of operation performed in merge and combine to not to effect the actual result (0 for addition (or) 1 for multiplication)

Upvotes: 1

user6022341
user6022341

Reputation:

zeroValue is added every time new key is seen on current partition so it can be added as many times as many partitions you have and shouldn't change the result of merge and seq ops. This is why 0 is valid for addition but 1 is not.

Upvotes: 2

Related Questions