Reputation: 143
Can anybody give a detailed explanation of how below agrregate action in spark produces result of (9,4)
val rdd = sc.parallelize(List(1,2,3,3))
rdd.aggregate((0, 0))
((x, y) =>
(x._1 + y, x._2 + 1),
(x, y) =>
(x._1 + y._1, x._2 + y._2))
res : (9,4)
Upvotes: 0
Views: 2593
Reputation: 883
Let's make it easy,
Case 1: No partition, just stand alone (seqOp). Focus on (x, y) => (x._1 + y, x._2 + 1), ignore partition.
{1,2,3,3}
x._1 y x._2
(0+1,0+1) 0 1 0
(1+2,1+1) 1 2 1
(3+3,2+1) 3 3 2
(6+3,3+1) 6 3 3
Result:(9,4)
Case 2: With partitions, suppose 2 partitions (combOp) {1,2}, {3,3} . Focus on (x, y) => (x._1 + y, x._2 + 1),
(x, y) => (x._1 + y._1, x._2 + y._2))
{1,2}
x._1 y x._2
(0+1,0+1) 0 1 0
(1+2,1+1) 1 2 1
Result1: (3,2)
{3,3}
x._1 y x._2
(0+3,0+1) 0 3 0
(3+3,1+1) 3 3 1
Result2: (6,2)
Final result:(9,4)
Upvotes: 0
Reputation: 74669
This is Spark 2.1.0 here (which should not matter much, but...)
Go to the official documentation of aggregate
(aka scaladoc) and read:
Aggregate the elements of each partition, and then the results for all the partitions, using given combine functions and a neutral "zero value". This function can return a different result type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are allowed to modify and return their first argument instead of creating a new U to avoid memory allocation.
The signature is as follows (removed the implicit parameter as not particularly interesting):
aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U): U
The scaladoc says:
zeroValue the initial value for the accumulated result of each partition for the seqOp operator, and also the initial value for the combine results from different partitions for the combOp operator - this will typically be the neutral element (e.g. Nil for list concatenation or 0 for summation)
In your case, zeroValue
is (0, 0)
.
seqOp an operator used to accumulate results within a partition
In your case, seqOp
is (x, y) => (x._1 + y, x._2 + 1)
which is a function accepting two pairs, unfortunately named x
and y
(which I'd call p1
and p2
at the very least or even using pattern matching and partial function, i.e. case ((x1, y1), (x2, y2)) => ...
).
Given you've got n
partitions (you can check it out using rdd.getNumPartition
), seqOp
is going to be called n
times.
The scaladoc says:
combOp an associative operator used to combine results from different partitions
which means that combOp
will combine all the results of seqOp
and apply the function:
(x, y) => (x._1 + y._1, x._2 + y._2)
It's again badly written so you see too much and I'd even call a noise. I'd write the function as follows:
{ case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2) }
Follow the types and give proper names, and ultimately everything in Scala becomes much easier ;-)
Upvotes: 1
Reputation: 13001
Basically the aggregate says: We want a tuple (a,b) where a is the sum of all elements and b is their count.
This is done by initializing to (0,0) and then we have two functions:
The first function just does the summing when we get a single element at a time, i.e. the tuple is updated from a single element by adding the value to the first element and adding 1 (count) to the second.
The second function merges two results so it just does element wise addition
Lets consider the example of your input data:
Lets say 1,2 is in partition 1 and 3,3 is in partition 3.
Partition 1 calculation
Partition 1 would start with (0,0).
Then the first function begins to work.
When we add the one we get (1,1). The first element is the sum (0 + y where y is 1) and the second is the count (0 + 1).
Now we add 2 so we get (1+2, 1+1)=(3,2). again the first element is the sum of values we saw so far and the second is their count.
Partition 2 calculation
On the second partition we again start with (0,0) and then we get (3,1) from the first 3 and (6,2) from the second.
Merging the results
Now the second function comes into play merging the two: we merge (3,2) and (6,2) by summing both elements and getting (9,4)
Upvotes: 3