Reputation: 399
I'm defining helper map function as a separate def in helper object and it does not "see" the accumulator defined earlier in code. Spark docs seams to be recommending to keep "remote" functions inside object, but how do I make it all work with those accumulators?
object mainlogic{
val counter = sc.accumulator(0)
val data = sc.textFile(...)// load logic here
val myrdd = data.mapPartitionsWithIndex(mapFunction)
}
object helper{
def mapFunction(...)={
counter+=1 // not compiling
}
}
Upvotes: 3
Views: 520
Reputation: 67075
Something like that would need to be passed in as a parameter just like any other code:
object mainlogic{
val counter = sc.accumulator(0)
val data = sc.textFile(...)// load logic here
val myrdd = data.mapPartitionsWithIndex(mapFunction(counter, _, _))
}
object helper{
def mapFunction(counter: Accumulator[Int], ...)={
counter+=1 // not compiling
}
}
Make sure to remember the note from the docs though:
For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware of that each task’s update may be applied more than once if tasks or job stages are re-executed.
Upvotes: 1