Reputation: 2948
I am trying to access an accumulator's value while in a task of a cluster. But when I do so it throws an exception:
can't read the accumulator's value
I tried to use the row.localValue
but it returns the same numbers. Is there a workaround?
private def modifyDataset(
data: String, row: org.apache.spark.Accumulator[Int]): Array[Int] = {
var line = data.split(",")
var lineSize = line.size
var pairArray = new Array[Int](lineSize-1)
var a = row.value
paiArray(0)=a
row+=1
pairArray
}
var sc = Spark_Context.InitializeSpark
var row = sc.accumulator(1, "Rows")
var dataset = sc.textFile("path")
var pairInfoFile = noHeaderRdd.flatMap{ data => modifyDataset(data,row) }
.persist(StorageLevel.MEMORY_AND_DISK)
pairInfoFile.count()
Upvotes: 1
Views: 4231
Reputation: 330393
It is simply not possible and there is no workaround. Spark accumulators
are write-only variables from the worker perspective. Any attempt to read its value during the task doesn't make sense because there is no shared state between workers and local accumulator value reflects only the state for the current partition.
Generally speaking accumulators
are intended mostly for diagnostics and shouldn't be used as the part of the application logic. When used inside transformations the only guarantee you get is at-least-once execution.
See also: How to print accumulator variable from within task (seem to "work" without calling value method)?
Upvotes: 2