Reputation: 1332
I don't understand why my accumulator hasn't been updated properly by Spark.
object AccumulatorsExample extends App {
val acc = sc.accumulator(0L, "acc")
sc range(0, 20000, step = 25) map { _ => acc += 1 } count()
assert(acc.value == 800) // not equals
}
My Spark config:
setMaster("local[*]") // should use 8 cpu cores
I'm not sure if Spark distribute computations of accumulator on every core and maybe that's the problem.
My question is how can I aggregate all acc
values in one single sum and get the right accumulator value (800)?
PS
If I restrict core number setMaster("local[1]")
than all works fine.
Upvotes: 2
Views: 1002
Reputation: 330383
There are two different issues here:
You are extending App
instead of implementing main
method. There are some known issues related to this approach including incorrect accumulator behavior and because of that it shouldn't be used in Spark applications. This is most likely the source of the problem.
See for example SPARK-4170 for other possible issues related to extending App
.
You are using accumulators inside transformations. It means that accumulator can incremented arbitrary number of times (at least once when given job is successful).
In general you require exact results you should use accumulators only inside actions like foreach
and foreachPartition
although it it rather unlikely you'll experience any issues in a toy application like this.
Upvotes: 3