Reputation: 419
I am facing above exception when I am trying to apply a method(ComputeDwt) on RDD[(Int,ArrayBuffer[(Int,Double)])]
input.
I am even using extends Serialization
option to serialize objects in spark.Here is the code snippet.
input:series:RDD[(Int,ArrayBuffer[(Int,Double)])]
DWTsample extends Serialization is a class having computeDwt function.
sc: sparkContext
val kk:RDD[(Int,List[Double])]=series.map(t=>(t._1,new DWTsample().computeDwt(sc,t._2)))
Error:
org.apache.spark.SparkException: Job failed: java.io.NotSerializableException: org.apache.spark.SparkContext
org.apache.spark.SparkException: Job failed: java.io.NotSerializableException: org.apache.spark.SparkContext
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
Could anyone suggest me what could be the problem and what should be done to overcome this issue?
Upvotes: 7
Views: 8057
Reputation: 722
I know I'm late to the party but I'm posting this answer as a courtesy to whoever gets this error.
First, when a method inside of a class is called from an RDD map function, spark will attempt to serialize the class encapsulating the method being called. This results is an error because the class contains a reference to the sparkContext (which isn't a serializable object).
The following Stack Overflow questions give an overview of the generic issue and a few solutions. One of which is to create a companion object to the class that holds the private methods being called from the map function.
Overview of the issue: Apache Spark map function org.apache.spark.SparkException: Task not serializable
A possible solution: How to qualify methods as static in Scala?
Upvotes: 0
Reputation: 13801
The line
series.map(t=>(t._1,new DWTsample().computeDwt(sc,t._2)))
references the SparkContext (sc
) but SparkContext isn't serializable. SparkContext is designed to expose operations that are run on the driver; it can't be referenced/used by code that's run on workers.
You'll have to re-structure your code so that sc
isn't referenced in your map function closure.
Upvotes: 20