yh18190
yh18190

Reputation: 419

Spark job is failed due to java.io.NotSerializableException: org.apache.spark.SparkContext

I am facing above exception when I am trying to apply a method(ComputeDwt) on RDD[(Int,ArrayBuffer[(Int,Double)])] input. I am even using extends Serialization option to serialize objects in spark.Here is the code snippet.

input:series:RDD[(Int,ArrayBuffer[(Int,Double)])] 
DWTsample extends Serialization is a class having computeDwt function.
sc: sparkContext

val  kk:RDD[(Int,List[Double])]=series.map(t=>(t._1,new DWTsample().computeDwt(sc,t._2)))

Error:
org.apache.spark.SparkException: Job failed: java.io.NotSerializableException: org.apache.spark.SparkContext
org.apache.spark.SparkException: Job failed: java.io.NotSerializableException: org.apache.spark.SparkContext
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)

Could anyone suggest me what could be the problem and what should be done to overcome this issue?

Upvotes: 7

Views: 8057

Answers (2)

Nizar
Nizar

Reputation: 722

I know I'm late to the party but I'm posting this answer as a courtesy to whoever gets this error.

First, when a method inside of a class is called from an RDD map function, spark will attempt to serialize the class encapsulating the method being called. This results is an error because the class contains a reference to the sparkContext (which isn't a serializable object).

The following Stack Overflow questions give an overview of the generic issue and a few solutions. One of which is to create a companion object to the class that holds the private methods being called from the map function.

Overview of the issue: Apache Spark map function org.apache.spark.SparkException: Task not serializable

A possible solution: How to qualify methods as static in Scala?

Upvotes: 0

Josh Rosen
Josh Rosen

Reputation: 13801

The line

series.map(t=>(t._1,new DWTsample().computeDwt(sc,t._2)))

references the SparkContext (sc) but SparkContext isn't serializable. SparkContext is designed to expose operations that are run on the driver; it can't be referenced/used by code that's run on workers.

You'll have to re-structure your code so that sc isn't referenced in your map function closure.

Upvotes: 20

Related Questions