Reputation: 237
I am trying to make a spark streaming program using a model to predict, but I get an error doing this: Task not serializable.
Code:
val model = sc.objectFile[DecisionTreeModel]("DecisionTreeModel").first()
val parsedData = reducedData.map { line =>
val arr = Array(line._2._1,line._2._2,line._2._3,line._2._4,line._2._5,line._2._6,line._2._7,line._2._8,line._2._9,line._2._10,line._2._11)
val vector = LabeledPoint(line._2._4, Vectors.dense(arr))
model.predict(vector.features))
}
I paste the error:
scala> val parsedData = reducedData.map { line =>
| val arr = Array(line._2._1,line._2._2,line._2._3,line._2._4,line._2._5,line._2._6,line._2._7,line._2._8,line._2._9,line._2._10,line._2._11)
| val vector=LabeledPoint(line._2._4, Vectors.dense(arr))
| model.predict(vector.features)
| }
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2030)
at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:528)
at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:528)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:709)
at .......
How can I solve this issue?
Thanks!
Upvotes: 0
Views: 480
Reputation: 995
Refer this link: https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/javaionotserializableexception.html
In your case, "model" is instantiated in driver and used in map which causes the object to be sent over network from driver to executors, so it should be serializable. If you cannot make model serializable, try avoiding having to serialize by instantiating model inside map.You may also need to control how often you create this object within executor - once per row(default), once per task(i.e., thread) or once per executor(i.e, jvm).
Finally, I don't think you can have a single global "model" object that you can cause mutations to from multiple executors - just in case that's what you are looking for(irrespective of whether you need to make it serializable or not).Comments welcome on this point.
Upvotes: 1