Sandeep Samal
Sandeep Samal

Reputation: 143

How does RDD.aggregate action work (i.e. how to understand the parameters)?

Can anybody give a detailed explanation of how below agrregate action in spark produces result of (9,4)

val rdd = sc.parallelize(List(1,2,3,3))

rdd.aggregate((0, 0))
((x, y) =>
(x._1 + y, x._2 + 1),
(x, y) =>
(x._1 + y._1, x._2 + y._2))

res : (9,4)

Upvotes: 0

Views: 2593

Answers (3)

Ted Corleone
Ted Corleone

Reputation: 883

Let's make it easy,

Case 1: No partition, just stand alone (seqOp). Focus on (x, y) => (x._1 + y, x._2 + 1), ignore partition.

{1,2,3,3}
            x._1    y      x._2     
(0+1,0+1)   0       1      0
(1+2,1+1)   1       2      1
(3+3,2+1)   3       3      2
(6+3,3+1)   6       3      3
Result:(9,4)

Case 2: With partitions, suppose 2 partitions (combOp) {1,2}, {3,3} . Focus on (x, y) => (x._1 + y, x._2 + 1),

(x, y) => (x._1 + y._1, x._2 + y._2))

{1,2}
            x._1    y      x._2     
(0+1,0+1)   0       1      0
(1+2,1+1)   1       2      1
Result1: (3,2)

{3,3}
            x._1    y      x._2  
(0+3,0+1)   0       3      0
(3+3,1+1)   3       3      1
Result2: (6,2)

Final result:(9,4)

Upvotes: 0

Jacek Laskowski
Jacek Laskowski

Reputation: 74669

This is Spark 2.1.0 here (which should not matter much, but...)

Go to the official documentation of aggregate (aka scaladoc) and read:

Aggregate the elements of each partition, and then the results for all the partitions, using given combine functions and a neutral "zero value". This function can return a different result type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are allowed to modify and return their first argument instead of creating a new U to avoid memory allocation.

The signature is as follows (removed the implicit parameter as not particularly interesting):

aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U): U

The scaladoc says:

zeroValue the initial value for the accumulated result of each partition for the seqOp operator, and also the initial value for the combine results from different partitions for the combOp operator - this will typically be the neutral element (e.g. Nil for list concatenation or 0 for summation)

In your case, zeroValue is (0, 0).

seqOp an operator used to accumulate results within a partition

In your case, seqOp is (x, y) => (x._1 + y, x._2 + 1) which is a function accepting two pairs, unfortunately named x and y (which I'd call p1 and p2 at the very least or even using pattern matching and partial function, i.e. case ((x1, y1), (x2, y2)) => ...).

Given you've got n partitions (you can check it out using rdd.getNumPartition), seqOp is going to be called n times.

The scaladoc says:

combOp an associative operator used to combine results from different partitions

which means that combOp will combine all the results of seqOp and apply the function:

(x, y) => (x._1 + y._1, x._2 + y._2)

It's again badly written so you see too much and I'd even call a noise. I'd write the function as follows:

{ case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2) }

Follow the types and give proper names, and ultimately everything in Scala becomes much easier ;-)

Upvotes: 1

Assaf Mendelson
Assaf Mendelson

Reputation: 13001

Basically the aggregate says: We want a tuple (a,b) where a is the sum of all elements and b is their count.

This is done by initializing to (0,0) and then we have two functions:

  • The first function just does the summing when we get a single element at a time, i.e. the tuple is updated from a single element by adding the value to the first element and adding 1 (count) to the second.

  • The second function merges two results so it just does element wise addition

Lets consider the example of your input data:

Lets say 1,2 is in partition 1 and 3,3 is in partition 3.

Partition 1 calculation

Partition 1 would start with (0,0).

Then the first function begins to work.

When we add the one we get (1,1). The first element is the sum (0 + y where y is 1) and the second is the count (0 + 1).

Now we add 2 so we get (1+2, 1+1)=(3,2). again the first element is the sum of values we saw so far and the second is their count.

Partition 2 calculation

On the second partition we again start with (0,0) and then we get (3,1) from the first 3 and (6,2) from the second.

Merging the results

Now the second function comes into play merging the two: we merge (3,2) and (6,2) by summing both elements and getting (9,4)

Upvotes: 3

Related Questions