Reputation: 23
Even with a .cache()
d RDD, Spark still seems to serialize the data for each task run. Consider this code:
class LoggingSerializable() extends Externalizable {
override def writeExternal(out: ObjectOutput): Unit = {
println("xxx serializing")
}
override def readExternal(in: ObjectInput): Unit = {
println("xxx deserializing")
}
}
object SparkSer {
def main(args: Array[String]) = {
val conf = new SparkConf().setAppName("SparkSer").setMaster("local")
val spark = new SparkContext(conf)
val rdd: RDD[LoggingSerializable] = spark.parallelize(Seq(new LoggingSerializable())).cache()
println("xxx done loading")
rdd.foreach(ConstantClosure)
println("xxx done 1")
rdd.foreach(ConstantClosure)
println("xxx done 2")
spark.stop()
}
}
object ConstantClosure extends (LoggingSerializable => Unit) with Serializable {
def apply(t: LoggingSerializable): Unit = {
println("xxx closure ran")
}
}
It prints
xxx done loading
xxx serializing
xxx deserializing
xxx closure ran
xxx done 1
xxx serializing
xxx deserializing
xxx closure ran
xxx done 2
Even though I called .cache()
on rdd
, Spark still serializes the data for each call to .foreach
. The official docs say
When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it).
and that MEMORY_ONLY
means
Store RDD as deserialized Java objects in the JVM.
Note that Spark tries to serialize the data it's when serializing the task, but ConstantClosure
does not close over anything, so I don't understand why it would need to serialize any data.
I am asking because I would like to be able to run Spark in local mode without any performance loss, but having to serialize large elements in an RDD for each RDD action can be very costly. I am not sure if this problem is unique to local mode. It seems like Spark can't possibly send the data in an RDD over the wire to workers for every action, even when the RDD is cached.
I'm using spark-core 3.0.0.
Upvotes: 2
Views: 3988
Reputation: 20826
This is because you are using parallelize
. parallelize
is using a special RDD, ParallelCollectionRDD
, which put the data into Partition
s. Partition
defines a Spark task and it will be sent to executors inside a Spark task (ShuffleMapTask
or ResultTask
). If you print the stack trace in readExternal
and writeExternal
, you should be able to see that it happens when serializing and deserializing a Spark task.
In other words, data is a part of the Spark task metadata for ParallelCollectionRDD
, and Spark has to send tasks to run in executors, that's where the serialization happens.
Most of other RDDs read data from external systems (such as files), so they don't have such behavior.
Upvotes: 3
Reputation: 66886
I agree that behavior looks surprising. Off the top of my head, I might guess that it's because caching the blocks is asynchronous, and all of this happens very fast. It's possible it simply does not wait around for the cached partition to become available and recomputes it the second time.
To test that hypothesis, introduce a lengthy wait before the second foreach just to see if that changes things.
Upvotes: 1