Javier de la Rosa
Javier de la Rosa

Reputation: 237

Spark streaming: task "predict" not serializable

I am trying to make a spark streaming program using a model to predict, but I get an error doing this: Task not serializable.

Code:

val model = sc.objectFile[DecisionTreeModel]("DecisionTreeModel").first() 
val parsedData = reducedData.map { line =>
  val arr = Array(line._2._1,line._2._2,line._2._3,line._2._4,line._2._5,line._2._6,line._2._7,line._2._8,line._2._9,line._2._10,line._2._11)
  val vector = LabeledPoint(line._2._4, Vectors.dense(arr))
  model.predict(vector.features))
}

I paste the error:

scala> val parsedData = reducedData.map { line =>
     |     val arr = Array(line._2._1,line._2._2,line._2._3,line._2._4,line._2._5,line._2._6,line._2._7,line._2._8,line._2._9,line._2._10,line._2._11)
     |     val vector=LabeledPoint(line._2._4, Vectors.dense(arr))
     |     model.predict(vector.features)
     | }
org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2030)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:528)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:528)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
    at org.apache.spark.SparkContext.withScope(SparkContext.scala:709)
    at .......

How can I solve this issue?

Thanks!

Upvotes: 0

Views: 480

Answers (1)

Eswara Reddy Adapa
Eswara Reddy Adapa

Reputation: 995

Refer this link: https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/javaionotserializableexception.html

In your case, "model" is instantiated in driver and used in map which causes the object to be sent over network from driver to executors, so it should be serializable. If you cannot make model serializable, try avoiding having to serialize by instantiating model inside map.You may also need to control how often you create this object within executor - once per row(default), once per task(i.e., thread) or once per executor(i.e, jvm).

Finally, I don't think you can have a single global "model" object that you can cause mutations to from multiple executors - just in case that's what you are looking for(irrespective of whether you need to make it serializable or not).Comments welcome on this point.

Upvotes: 1

Related Questions