Guille
Guille

Reputation: 2320

Spark - Kryo vs JavaSerialization. Same size?

I'm using cache with Spark. Right now, I use several caches, and some of them are about 20gb in memory. I tried first with cache() and later with persist and MEMORY_SER and the size was huge so I changed to java serialization, getting about 20gb in some of them. Now, I want to use Kryo, I have register the classes and I don't get any error, but the size it's the same than when I execute it with Kryo in the most of caches.

Some of the object that I want to cache are like:

case class ObjectToCache(id: Option[Long],
                      listObject1: Iterable[ObjectEnriched],
                       mp1: Map[String, ObjectEnriched2],
                       mp2: Map[String, ObjectEnriched3],
                       mp3: Map[String, ObjectEnriched4])

I have registered in Kryo as:

kryo.register(classOf[Iterable[ObjectEnriched2]])
kryo.register(classOf[Map[String,ObjectEnriched3]])
kryo.register(classOf[Map[String,ObjectEnriched4]])
kryo.register(ObjectEnriched)
kryo.register(ObjectEnriche2)
kryo.register(ObjectEnriched3)
kryo.register(ObjectEnriched4)

Am I doing something wrong? is there any way to know if it uses Kryo? I think that it's using because in some point I got an error because I didn't have space left as:

Serialization trace:
mp1 (ObjectEnriched)
        at com.esotericsoftware.kryo.io.Output.flush(Output.java:183)
        at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37)
        at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)

I'm using RDD with Spark Streaming.

Upvotes: 0

Views: 1488

Answers (2)

rennerj2
rennerj2

Reputation: 76

Am I doing something wrong? is there any way to know if it uses Kryo?

You are indeed using kryo, and it is properly serializing your objects.

If you set the flag:

conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer") 
conf.set("spark.kryo.registrator","com.orange.iris.common.serializer.CustomKryoRegistrator")

Then it will definitely use the Kryo serializer. Also, since you include the flag:

conf.set("spark.kryo.registrationRequired", "true") 

it will fail if it tries to serialize an unregistered class (See this answer for more info).

How much memory do you have? If your sizes are about the same using Java and Kryo serialization and you are persisting to MEMORY_ONLY_SER, then it's possible that your partitions still don't fit in memory even if using Kryo and Spark is recomputing the parts that don't fit on the fly. This would cause the sizes to be the same.

A couple of ways to find this out would be run the job and persist to MEMORY_AND_DISK_SER, then check for disk spillage when using Kryo. See here for more info on the storage levels

Upvotes: 0

Gangadhar Kadam
Gangadhar Kadam

Reputation: 546

To check if a dataframe(DF) is cached or not Just trigger the caching by calling an action df.show and check the spark UI at http://localhost:4040/storage to see if the DF is cached.You should see there.

You may also use queryExecution or explain to see InMemoryRelation

scala> df.queryExecution.withCachedData

res0: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) +- *Range (0, 1, step=1, splits=Some(8))

Also Try using Datasets instead of DataFrame. DataSet don't use standard serialization methods. They use specialized columnar storage with its own compression methods and you don't even need to store your Dataset with the Kryo Serializer.

Upvotes: 0

Related Questions