Michael Zeltser
Michael Zeltser

Reputation: 399

How to access accumulators in object outside the place where they were defined?

I'm defining helper map function as a separate def in helper object and it does not "see" the accumulator defined earlier in code. Spark docs seams to be recommending to keep "remote" functions inside object, but how do I make it all work with those accumulators?

object mainlogic{
    val counter = sc.accumulator(0)
    val data = sc.textFile(...)// load logic here
    val myrdd = data.mapPartitionsWithIndex(mapFunction)
}

object helper{
  def mapFunction(...)={
      counter+=1 // not compiling
  }
}

Upvotes: 3

Views: 520

Answers (1)

Justin Pihony
Justin Pihony

Reputation: 67075

Something like that would need to be passed in as a parameter just like any other code:

object mainlogic{
    val counter = sc.accumulator(0)
    val data = sc.textFile(...)// load logic here
    val myrdd = data.mapPartitionsWithIndex(mapFunction(counter, _, _))
}

object helper{
  def mapFunction(counter: Accumulator[Int], ...)={
      counter+=1 // not compiling
  }
}

Make sure to remember the note from the docs though:

For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware of that each task’s update may be applied more than once if tasks or job stages are re-executed.

Upvotes: 1

Related Questions