Reputation: 1359
I was playing with Spark. I am bit confused with the working of aggregateby key function.
If I provide non-zero initial value. It is adding 2*initial value in the total.
Following is the code snippet:
JavaPairRDD<String, Integer> mapToPair = rdd.mapToPair(message -> new Tuple2<String, Integer>(message.split(",")[0], Integer.parseInt(message.split(",")[1])))
Function2<Integer, Integer, Integer> mergeValue =(v1, v2) -> v1+v2; Function2<Integer, Integer, Integer> mergeCombiners =(v1, v2) -> v1+v2;
JavaPairRDD<String, Integer> aggregateByKey = mapToPair.aggregateByKey(1, mergeValue, mergeCombiners);
System.out.println("Aggregate by key "+ aggregateByKey.collect());
Following is my input rdd:
hello,1
hello,1
hello,1
hello,1
Output I am getting is
Aggregate by key [(hello,6)]
Please explain its working
Upvotes: 1
Views: 213
Reputation: 2415
I agree with @LostInOverflow and here is the explanation why Spark has a zeroValue as first arugment in place in aggregateByKey:
Both 'merging values within a partition' (argument 2) and 'merging values betweeen partitions' (argument 3) functions read and update the first argument (zeroValue) and return it instead of creating a new return value to avoid extra memory allocation. This could be negligible for small scale operations but will be a memory saving technique for a very large scale operations running on cluster(s) with hundreds of nodes
Hence it will be an arbitrary value chosen based on the kind of operation performed in merge and combine to not to effect the actual result (0 for addition (or) 1 for multiplication)
Upvotes: 1
Reputation:
zeroValue is added every time new key is seen on current partition so it can be added as many times as many partitions you have and shouldn't change the result of merge and seq ops. This is why 0 is valid for addition but 1 is not.
Upvotes: 2