Aaron
Aaron

Reputation: 1332

Spark: Accumulators does not work properly when I use it in Range

I don't understand why my accumulator hasn't been updated properly by Spark.

object AccumulatorsExample extends App {
  val acc = sc.accumulator(0L, "acc")
  sc range(0, 20000, step = 25) map { _ => acc += 1 } count()
  assert(acc.value == 800) // not equals
}

My Spark config:

setMaster("local[*]") // should use 8 cpu cores

I'm not sure if Spark distribute computations of accumulator on every core and maybe that's the problem.

My question is how can I aggregate all acc values in one single sum and get the right accumulator value (800)?

PS

If I restrict core number setMaster("local[1]") than all works fine.

Upvotes: 2

Views: 1002

Answers (1)

zero323
zero323

Reputation: 330383

There are two different issues here:

  • You are extending App instead of implementing main method. There are some known issues related to this approach including incorrect accumulator behavior and because of that it shouldn't be used in Spark applications. This is most likely the source of the problem.

    See for example SPARK-4170 for other possible issues related to extending App.

  • You are using accumulators inside transformations. It means that accumulator can incremented arbitrary number of times (at least once when given job is successful).

    In general you require exact results you should use accumulators only inside actions like foreach and foreachPartition although it it rather unlikely you'll experience any issues in a toy application like this.

Upvotes: 3

Related Questions