user34812
user34812

Reputation: 523

Why does this Spark code throw java.io.NotSerializableException

I want to access a companion object's method inside a transformation on an RDD. Why does the following not work:

import org.apache.spark.rdd.RDD
import spark.implicits._
import org.apache.spark.sql.{Encoder, Encoders}

class Abc {
    def transform(x: RDD[Int]): RDD[Double] = { x.map(Abc.fn) }
}

object Abc {
  def fn(x: Int): Double = { x.toDouble }
}

implicit def abcEncoder: Encoder[Abc] = Encoders.kryo[Abc]

new Abc().transform(sc.parallelize(1 to 10)).collect

The above piece of code throws a java.io.NotSerializableException:

org.apache.spark.SparkException: Task not serializable
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
  at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
  at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
  at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.RDD.map(RDD.scala:369)
  at Abc.transform(<console>:19)
  ... 47 elided
Caused by: java.io.NotSerializableException: Abc
Serialization stack:
        - object not serializable (class: Abc, value: Abc@4f598dfb)
        - field (class: Abc$$anonfun$transform$1, name: $outer, type: class Abc)
        - object (class Abc$$anonfun$transform$1, <function1>)
  at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
  at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
  at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
  ... 57 more

Even defining an Encoder for the class Abc doesn't help here. But the more important question is, why is the serialization of an object of class Abc being attempted at all? My first thought was that the companion object is a singleton object of the class, so maybe there's an attempt to serialize it. But doesn't seem like the case, because when I call Abc.fn from another class:

class Xyz {
    def transform(x: RDD[Int]): RDD[Double] = { x.map(Abc.fn) }
}

implicit def xyzEncoder: Encoder[Xyz] = Encoders.kryo[Xyz]

new Xyz().transform(sc.parallelize(1 to 10)).collect

I get a java.io.NotSerializableException: Xyz

Upvotes: 3

Views: 3326

Answers (2)

koiralo
koiralo

Reputation: 23119

The main abstraction os spark is RDD which are partitioned across the node of the cluster. So when we run the RDD, It is serialized in the driver node and it is distributed to other appropriate nodes. Then the worker nodes deserialize it and execute.

In your case, The class ABC cannot be serialized and distributed to other worker nodes. You need to serialize the Class ABC with Serializable

class Abc with Serializable{
    def transform(x: RDD[Int]): RDD[Double] = { x.map(Abc.fn) }
}

Upvotes: 2

paulsm4
paulsm4

Reputation: 121829

Here is a great article that discusses "serializable" vs. "non-serializable objects in Apache Spark:

Using Non-Serializable Objects in Apache Spark, Nicola Ferraro

The article offers several suggestions as to:

  • What's going on in your particular case

  • Some alternatives so your object doesn't need to be "serializable"

Upvotes: 3

Related Questions