Adam Pauls
Adam Pauls

Reputation: 23

Why does Spark need to serialize data in an RDD for each task it runs?

Even with a .cache()d RDD, Spark still seems to serialize the data for each task run. Consider this code:

class LoggingSerializable() extends Externalizable {
  override def writeExternal(out: ObjectOutput): Unit = {
    println("xxx serializing")
  }

  override def readExternal(in: ObjectInput): Unit = {
    println("xxx deserializing")
  }
}

object SparkSer {
  def main(args: Array[String]) = {
    val conf = new SparkConf().setAppName("SparkSer").setMaster("local")
    val spark = new SparkContext(conf)
    val rdd: RDD[LoggingSerializable] = spark.parallelize(Seq(new LoggingSerializable())).cache()
    println("xxx done loading")
    rdd.foreach(ConstantClosure)
    println("xxx done 1")
    rdd.foreach(ConstantClosure)
    println("xxx done 2")
    spark.stop()
  }
}

object ConstantClosure extends (LoggingSerializable => Unit) with Serializable {
  def apply(t: LoggingSerializable): Unit = {
    println("xxx closure ran")
  }
}

It prints

xxx done loading
xxx serializing
xxx deserializing
xxx closure ran
xxx done 1
xxx serializing
xxx deserializing
xxx closure ran
xxx done 2

Even though I called .cache() on rdd, Spark still serializes the data for each call to .foreach. The official docs say

When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it).

and that MEMORY_ONLY means

Store RDD as deserialized Java objects in the JVM.

Note that Spark tries to serialize the data it's when serializing the task, but ConstantClosure does not close over anything, so I don't understand why it would need to serialize any data.

I am asking because I would like to be able to run Spark in local mode without any performance loss, but having to serialize large elements in an RDD for each RDD action can be very costly. I am not sure if this problem is unique to local mode. It seems like Spark can't possibly send the data in an RDD over the wire to workers for every action, even when the RDD is cached.

I'm using spark-core 3.0.0.

Upvotes: 2

Views: 3988

Answers (2)

zsxwing
zsxwing

Reputation: 20826

This is because you are using parallelize. parallelize is using a special RDD, ParallelCollectionRDD, which put the data into Partitions. Partition defines a Spark task and it will be sent to executors inside a Spark task (ShuffleMapTask or ResultTask). If you print the stack trace in readExternal and writeExternal, you should be able to see that it happens when serializing and deserializing a Spark task.

In other words, data is a part of the Spark task metadata for ParallelCollectionRDD, and Spark has to send tasks to run in executors, that's where the serialization happens.

Most of other RDDs read data from external systems (such as files), so they don't have such behavior.

Upvotes: 3

Sean Owen
Sean Owen

Reputation: 66886

I agree that behavior looks surprising. Off the top of my head, I might guess that it's because caching the blocks is asynchronous, and all of this happens very fast. It's possible it simply does not wait around for the cached partition to become available and recomputes it the second time.

To test that hypothesis, introduce a lengthy wait before the second foreach just to see if that changes things.

Upvotes: 1

Related Questions