Reputation: 523
I want to access a companion object's method inside a transformation on an RDD. Why does the following not work:
import org.apache.spark.rdd.RDD
import spark.implicits._
import org.apache.spark.sql.{Encoder, Encoders}
class Abc {
def transform(x: RDD[Int]): RDD[Double] = { x.map(Abc.fn) }
}
object Abc {
def fn(x: Int): Double = { x.toDouble }
}
implicit def abcEncoder: Encoder[Abc] = Encoders.kryo[Abc]
new Abc().transform(sc.parallelize(1 to 10)).collect
The above piece of code throws a java.io.NotSerializableException
:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.map(RDD.scala:369)
at Abc.transform(<console>:19)
... 47 elided
Caused by: java.io.NotSerializableException: Abc
Serialization stack:
- object not serializable (class: Abc, value: Abc@4f598dfb)
- field (class: Abc$$anonfun$transform$1, name: $outer, type: class Abc)
- object (class Abc$$anonfun$transform$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
... 57 more
Even defining an Encoder
for the class Abc doesn't help here. But the more important question is, why is the serialization of an object of class Abc being attempted at all? My first thought was that the companion object is a singleton object of the class, so maybe there's an attempt to serialize it. But doesn't seem like the case, because when I call Abc.fn from another class:
class Xyz {
def transform(x: RDD[Int]): RDD[Double] = { x.map(Abc.fn) }
}
implicit def xyzEncoder: Encoder[Xyz] = Encoders.kryo[Xyz]
new Xyz().transform(sc.parallelize(1 to 10)).collect
I get a java.io.NotSerializableException: Xyz
Upvotes: 3
Views: 3326
Reputation: 23119
The main abstraction os spark is RDD which are partitioned across the node of the cluster. So when we run the RDD, It is serialized in the driver node and it is distributed to other appropriate nodes. Then the worker nodes deserialize it and execute.
In your case, The class ABC cannot be serialized and distributed to other worker nodes. You need to serialize the Class ABC with Serializable
class Abc with Serializable{
def transform(x: RDD[Int]): RDD[Double] = { x.map(Abc.fn) }
}
Upvotes: 2
Reputation: 121829
Here is a great article that discusses "serializable" vs. "non-serializable objects in Apache Spark:
Using Non-Serializable Objects in Apache Spark, Nicola Ferraro
The article offers several suggestions as to:
What's going on in your particular case
Some alternatives so your object doesn't need to be "serializable"
Upvotes: 3