Vikram Singh Chandel
Vikram Singh Chandel

Reputation: 633

Why result of Spark reduceByKey is not consistent

I am trying to count the number of iteration of each line via spark using scala.
Following is my input:

1 vikram
2 sachin
3 shobit
4 alok
5 akul
5 akul
1 vikram
1 vikram
3 shobit
10 ashu
5 akul
1 vikram
2 sachin
7 vikram

now i create 2 separate RDDs as follows.

val f1 = sc.textFile("hdfs:///path to above data file")
val m1 = f1.map( s => (s.split(" ")(0),1) ) //creating a tuple (key,1)
//now if i create a RDD as
val rd1 = m1.reduceByKey((a,b) => a+b )
rd1.collect().foreach(println)
//I get a proper output i.e (it gives correct output every time)
//output: (4,1) (2,2) (7,1) (5,3) (3,2) (1,4) (10,1)

//but if i create a RDD as
val rd2 = m1.reduceByKey((a,b) => a+1 )
rd2.collect().foreach(println)
//I get a inconsistent result i.e some times i get this (WRONG)
//output: (4,1) (2,2) (7,1) (5,2) (3,2) (1,2) (10,1)
//and sometimes I get this as output (CORRECT)
//output: (4,1) (2,2) (7,1) (5,3) (3,2) (1,4) (10,1) 

I am unable yo understand why is this happening and where to use what. I have also tried creating RDD as

val m2 = f1.map(s => (s,1))
val rd3 = m2.reduceByKey((a,b) => a+1 )
// Then also same issue occurs with a+1 but every thing works fine with a+b

Upvotes: 4

Views: 669

Answers (2)

Tzach Zohar
Tzach Zohar

Reputation: 37822

reduceByKey assumes the passed function is commutative and associative (as docs clearly state). And - your first function (a, b) => a + b is, but (a, b) => a+1 isn't.

WHY? For one thing - reduceByKey applies the supplied function to each partition, and then to the combined results of all partitions. In other words, b isn't always 1, so using a+1 simply isn't correct.

Think of the following scenario - the input contains 4 records, split into two partitions:

(aa, 1)
(aa, 1)

(aa, 1)
(cc, 1)

reduceByKey(f) on this input might be calculated as follows:

val intermediate1 = f((aa, 1), (aa, 1)) 
val intermediate2 = f((aa, 1), (cc, 1))

val result = f(intermediate2, intermediate1)

Now, let's follow this with f = (a, b) => a + b

val intermediate1 = f((aa, 1), (aa, 1))       // (aa, 2)
val intermediate2 = f((aa, 1), (cc, 1))       // (aa, 1), (cc, 1)

val result = f(intermediate2, intermediate1)  // (aa, 3), (cc, 1)

And with f = (a, b) => a + 1:

val intermediate1 = f((aa, 1), (bb, 1))       // (aa, 2)
val intermediate2 = f((aa, 1), (cc, 1))       // (aa, 1), (cc, 1)

// this is where it goes wrong:
val result = f(intermediate2, intermediate1)  // (aa, 2), (cc, 1)

Main thing is - order of intermediate calculations isn't guaranteed and may change between executions, and for the latter case of non-commutative function this means result are sometimes wrong.

Upvotes: 8

Rakesh Rakshit
Rakesh Rakshit

Reputation: 592

The function (a , b) => (a + 1) fails to be associative in nature. The associative law says,

f(a ,f(b , c)) = f(f(a , b), c) 

Assume the below keys:

a = (x, 1)
b = (x, 1)
c = (x, 1)

Applying the function (a , b) => (a + 1)

f(a ,f(b , c)) = (x , 2)

But,

f(f(a , b), c) = (x , 3)

Hence, it is not associative and is not applicable for reduceByKey.

Upvotes: 3

Related Questions