Dici
Dici

Reputation: 25950

Weirdness with Spark serialization

I've came accross a problem with Spark, using the JavaPairRdd.repartitionAndrepartitionAndSortWithinPartitions method. I've tried everything any reasonable person would have thought of. I finally written a small snippet simple enough to visualize the problem :

public class Main {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("test").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        final List<String> list = Arrays.asList("I", "am", "totally", "baffled");
        final HashPartitioner partitioner = new HashPartitioner(2);

        doSomething(sc, list, partitioner, String.CASE_INSENSITIVE_ORDER);
        doSomething(sc, list, partitioner, Main::compareString);
        doSomething(sc, list, partitioner, new StringComparator());
        doSomething(sc, list, partitioner, new SerializableStringComparator());
        doSomething(sc, list, partitioner, (s1,s2) -> Integer.compare(s1.charAt(0),s2.charAt(0)));
    }

    public static <T> void doSomething(JavaSparkContext sc, List<T> list, Partitioner partitioner, Comparator<T> comparator) {
        try {
            sc.parallelize(list)
                .mapToPair(elt -> new Tuple2<>(elt,elt))
                .repartitionAndSortWithinPartitions(partitioner,comparator)
                .count();
            System.out.println("success");
        } catch (Exception e) {
            System.out.println("failure");
        }
    }

    public static int compareString(String s1, String s2) {
        return Integer.compare(s1.charAt(0),s2.charAt(0));
    }

    public static class StringComparator implements Comparator<String> {
        @Override
        public int compare(String s1, String s2) {
            return Integer.compare(s1.charAt(0),s2.charAt(0));
        }
    }

    public static class SerializableStringComparator implements Comparator<String>, Serializable {
        @Override
        public int compare(String s1, String s2) {
            return Integer.compare(s1.charAt(0),s2.charAt(0));
        }
    }
}

Apart from the Spark logging, it outputs :

success
failure
failure 
success
failure

The exception thrown in case of failure is always the same :

org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.reflect.InvocationTargetException
sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:483)
org.apache.spark.serializer.SerializationDebugger$ObjectStreamClassMethods$.getObjFieldValues$extension(SerializationDebugger.scala:240)
org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:150)
org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:99)
org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:158)
org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:99)
org.apache.spark.serializer.SerializationDebugger$.find(SerializationDebugger.scala:58)
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:39)
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:835)
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:778)
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:781)
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:780)
scala.collection.immutable.List.foreach(List.scala:318)
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:780)
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:762)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:847)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:778)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:781)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:780)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:780)
    at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:762)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

Now I've got my fix : declaring my custom comparator as Serializable (I checked in the standard library code, the string case insensitive comparator is declared as serializable so that makes sense).

But why ? Why should I not use lambdas here ? I would have expected the second and the last one to work properly since I only used static methods and classes.

What I find especially weird is that I have registered the classes I am trying to serialize to Kryo, and the class I did not register can be trivially serialized with their default associated serializer (Kryo associates FieldSerializer as the default one for most of the classes). However, Kryo registrator is never executed before the task fails being serialized.

Upvotes: 1

Views: 1225

Answers (1)

Dici
Dici

Reputation: 25950

My question was not clearly expliciting why I was so baffled (about Kryo registering code not being executed), so I edited it to reflect it.

I have figured out that Spark uses two different serializers :

  • one for serializing the tasks from the master to the slaves, called closureSerializer in the code (see SparkEnv.scala). It can only be set to JavaSerializer at the date of my post.

  • one for serializing the actual data that is processed, called serializer in the SparkEnv. This one can be set either to JavaSerializer or `KryoSerializer.

Registering a class to Kryo does not ensure you that it will always be serialized with Kryo, it depends on how you use it. As an example, the DAGScheduler only uses closureSerializer so no matter how you configure serialization, you will always need to make your objects Java-serializables if they are manipulated by DAGScheduler at some point (unless Spark enables Kryo serialization for closures in later releases).

Upvotes: 2

Related Questions