Reputation: 81
When a lambda passed to Spark RDD operation refers to objects outside of its scope, it will include the necessary context to create the serialized task for distributed execution. In the following simple example, why did it decide to serialize the entire OutClass instance, as opposed to just the multiplier? I was suspecting multiplier is actually a Scala getter method under the hood so it has to include reference of the class. Declaring OuterClass extends Serializable will work, but it introduces unnecessary constraints. I would really appreciate a way to make it work without declaring OuterClass serializable.
object ClosureTest {
def main(args: Array[String]): Unit = {
val sc = SparkContext.getOrCreate(new SparkConf().setMaster("local[2]").setAppName("test"))
println(new OuterClass(10).sparkSumProd(sc.parallelize(Seq(1,2,3))))
}
class OuterClass(multiplier: Int) {
def sparkSumProd(data: RDD[Int]): Double = {
data.map{
v => v * multiplier
}.sum()
}
}
}
Here is the output from Spark's SerializationDebugger
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2056)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:366)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:365)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.map(RDD.scala:365)
at ClosureTest$OuterClass.sparkSumProd(ClosureTest.scala:14)
at ClosureTest$.main(ClosureTest.scala:10)
at ClosureTest.main(ClosureTest.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.io.NotSerializableException: ClosureTest$OuterClass
Serialization stack:
- object not serializable (class: ClosureTest$OuterClass, value: ClosureTest$OuterClass@36a7abe1)
- field (class: ClosureTest$OuterClass$$anonfun$sparkSumProd$1, name: $outer, type: class ClosureTest$OuterClass)
- object (class ClosureTest$OuterClass$$anonfun$sparkSumProd$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
... 17 more
Upvotes: 0
Views: 1334
Reputation: 81
Just assigning the class level variable to a local variable makes it work.
object ClosureTest {
def main(args: Array[String]): Unit = {
val sc = SparkContext.getOrCreate(new SparkConf().setMaster("local[2]").setAppName("test"))
println(new OuterClass(10).sparkSumProd(sc.parallelize(Seq(1,2,3))))
}
class OuterClass(multiplier: Int) {
def sparkSumProd(data: RDD[Int]): Double = {
val m = multiplier
data.map{
v => v * m
}.sum()
}
}
}
Upvotes: 1