Carsten
Carsten

Reputation: 2040

Spark: Task not serializable (Broadcast/RDD/SparkContext)

There are numerous questions about Task is not serializable in Spark. However, this case seems quite particular.

I have created a class:

class Neighbours(e: RDD[E], m: KMeansModel) extends Serializable {
  val allEs: RDD[(String, E)] = e.map(e => (e.w, e))
    .persist()
  val sc = allEs.sparkContext
  val centroids = sc.broadcast(m.clusterCenters)
  [...]

The class defines the following method:

private def centroidDistances(v: Vector): Array[Double] = {
  centroids.value.map(c => (centroids.value.indexOf(c), Vectors.sqdist(v, c)))
    .sortBy(_._1)
    .map(_._2)
}

However, when the class is called, a Task is not serializable exception is thrown.

Strange enough, a tiny change in the header of class Neighbours suffices to fix the issue. Instead of creating a val sc: SparkContext to use for broadcasting, I merely inline the code that creates the Spark context:

class Neighbours(e: RDD[E], m: KMeansModel) extends Serializable {
  val allEs: RDD[(String, E)] = e.map(e => (e.w, e))
  .setName("embeddings")
  .persist()
  val centroids = allEmbeddings.sparkContext(m.clusterCenters)
  [...]

My question is: how does the second variant make a difference? What goes wrong in the first one? Intuitively, this should be merely syntactic sugar, is this a bug in Spark?

I use Spark 1.4.1 on a Hadoop/Yarn cluster.

Upvotes: 2

Views: 4227

Answers (1)

mattinbits
mattinbits

Reputation: 10428

When you define

class Neighbours(e: RDD[E], m: KMeansModel) extends Serializable {
  ...
  val sc = allEmbeddings.sparkContext
  val centroids = sc.broadcast(m.clusterCenters)
  ...
}

You have made sc into a class variable, meaning it could be accessed from an instance of Neighbours e.g. neighbours.sc. This means that sc needs to be serializable, which is it not.

When you inline the code, only the final value of centroids needs to be serializable. centroids is of type Broadcast which is Serializable.

Upvotes: 1

Related Questions