Reputation: 2710
In the official spark documentation, there is an example for an accumulator which is used in a foreach
call which is directly on an RDD:
scala> val accum = sc.accumulator(0)
accum: spark.Accumulator[Int] = 0
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Int = 10
I implemented my own accumulator:
val myCounter = sc.accumulator(0)
val myRDD = sc.textFile(inputpath) // :spark.RDD[String]
myRDD.flatMap(line => foo(line)) // line 69
def foo(line: String) = {
myCounter += 1 // line 82 throwing NullPointerException
// compute something on the input
}
println(myCounter.value)
In a local setting, this works just fine. However, if I run this job on a spark standalone cluster with several machines, the workers throw a
13/07/22 21:56:09 ERROR executor.Executor: Exception in task ID 247
java.lang.NullPointerException
at MyClass$.foo(MyClass.scala:82)
at MyClass$$anonfun$2.apply(MyClass.scala:67)
at MyClass$$anonfun$2.apply(MyClass.scala:67)
at scala.collection.Iterator$$anon$21.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
at spark.PairRDDFunctions.writeToFile$1(PairRDDFunctions.scala:630)
at spark.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:640)
at spark.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:640)
at spark.scheduler.ResultTask.run(ResultTask.scala:77)
at spark.executor.Executor$TaskRunner.run(Executor.scala:98)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
at the line which increments the accumulator myCounter
.
My question is: Can accumulators only be used in "top-level" anonymous functions which are applied directly to RDDs and not in nested functions? If yes, why does my call succeed locally and fail on a cluster?
edit: increased verbosity of exception.
Upvotes: 11
Views: 3124
Reputation: 1
If you use "flatMap" then "myCounter" will not update because "flatMap" is lazy function. You can use this code:
myRDD.foreach(line => foo(line))
def foo(line: String) = {myCounter +=1}
println(myCounter.value)
Upvotes: -1
Reputation: 31
In my case too, accumulator was null in closure when I used 'extends App' to create a spark application as shown below
object AccTest extends App {
val conf = new SparkConf().setAppName("AccTest").setMaster("yarn-client")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val accum = sc.accumulator(0, "My Accumulator")
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
println("count:" + accum.value)
sc.stop
}
}
I replaced extends App with main() method and it worked in YARN cluster in HDP 2.4
object AccTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("AccTest").setMaster("yarn-client")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val accum = sc.accumulator(0, "My Accumulator")
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
println("count:" + accum.value)
sc.stop
}
}
worked
Upvotes: 3
Reputation: 13927
What if you define the function like this:
def foo(line: String, myc: org.apache.spark.Accumulator[Int]) = {
myc += 1
}
And then call it like this:
foo(line, myCounter)
?
Upvotes: 1