Nick
Nick

Reputation: 2948

How to access the value of accumulator in tasks?

I am trying to access an accumulator's value while in a task of a cluster. But when I do so it throws an exception:

can't read the accumulator's value

I tried to use the row.localValue but it returns the same numbers. Is there a workaround?

private def modifyDataset(
  data: String, row: org.apache.spark.Accumulator[Int]): Array[Int] = {

  var line = data.split(",")
  var lineSize = line.size      
  var pairArray = new Array[Int](lineSize-1)
  var a = row.value
  paiArray(0)=a

  row+=1
  pairArray

}


var sc = Spark_Context.InitializeSpark
var row = sc.accumulator(1, "Rows")

var dataset = sc.textFile("path")

var pairInfoFile = noHeaderRdd.flatMap{ data => modifyDataset(data,row) }
  .persist(StorageLevel.MEMORY_AND_DISK)        
pairInfoFile.count()

Upvotes: 1

Views: 4231

Answers (1)

zero323
zero323

Reputation: 330393

It is simply not possible and there is no workaround. Spark accumulators are write-only variables from the worker perspective. Any attempt to read its value during the task doesn't make sense because there is no shared state between workers and local accumulator value reflects only the state for the current partition.

Generally speaking accumulators are intended mostly for diagnostics and shouldn't be used as the part of the application logic. When used inside transformations the only guarantee you get is at-least-once execution.

See also: How to print accumulator variable from within task (seem to "work" without calling value method)?

Upvotes: 2

Related Questions