Reputation: 283
Consider the following code in Spark that should return the sum of the sqrt's of a sequence of integers:
// Create an RDD of a sequence of integers
val data = sc.parallelize(Range(0,100))
// Transform RDD to sequence of Doubles
val x = data.map(_.toDouble)
// Reduce the sequence as the sum of the sqrt of each integer
// (repeated 10 times with each result stored as a kv pair)
val xReduceMultipleTimes = Range(0,10).map(n=>(n, x.reduce((x,y)=>x+Math.sqrt(y))))
The reduce operation is repeated multiple times, sequentially, on the same RDD, and should return the same result each time. However, the output I'm getting from Spark is inconsistent, and nowhere near the correct value.
xReduceMultipleTimes: scala.collection.immutable.IndexedSeq[(Int, Double)] =
Vector((0,105.44288170056565), (1,245.5267945723869), (2,190.04459651854287),
(3,233.32211443903282), (4,190.04459651854287), (5,105.44288170056566),
(6,273.5022316153404), (7,105.44288170056568), (8,105.44288170056566),
(9,205.51799497636216))
The correct result should be 661.463
, as verified with Mathematica.
Replacing Math.sqrt(y)
with y
yields the correct and consistent sum of the unrooted integers (i.e. 4950
).
Any ideas as to what could be causing the inconsistency?
Upvotes: 2
Views: 504
Reputation: 67135
Square root is not associative. Keep in mind that reduce
does two things, it first reduces locally, where the first parameter is indeed the accumulator and the second is the new value. The next thing it does is merge the intermediate results, this results in an accumulator added to the square root of the node value....not what you want. To do this, you must use aggregate
rdd.aggregate(0)((accum, value) => accum + Math.sqrt(value), _ + _)
I believe this captures your intent. The first param is the seed of 0, then is a function that is run locally on each node. Then the addition is merely adding up the nodes, which do not need to be square rooted.
Upvotes: 5