Reputation: 25950
I've came accross a problem with Spark, using the JavaPairRdd.repartitionAndrepartitionAndSortWithinPartitions
method. I've tried everything any reasonable person would have thought of. I finally written a small snippet simple enough to visualize the problem :
public class Main {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("test").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
final List<String> list = Arrays.asList("I", "am", "totally", "baffled");
final HashPartitioner partitioner = new HashPartitioner(2);
doSomething(sc, list, partitioner, String.CASE_INSENSITIVE_ORDER);
doSomething(sc, list, partitioner, Main::compareString);
doSomething(sc, list, partitioner, new StringComparator());
doSomething(sc, list, partitioner, new SerializableStringComparator());
doSomething(sc, list, partitioner, (s1,s2) -> Integer.compare(s1.charAt(0),s2.charAt(0)));
}
public static <T> void doSomething(JavaSparkContext sc, List<T> list, Partitioner partitioner, Comparator<T> comparator) {
try {
sc.parallelize(list)
.mapToPair(elt -> new Tuple2<>(elt,elt))
.repartitionAndSortWithinPartitions(partitioner,comparator)
.count();
System.out.println("success");
} catch (Exception e) {
System.out.println("failure");
}
}
public static int compareString(String s1, String s2) {
return Integer.compare(s1.charAt(0),s2.charAt(0));
}
public static class StringComparator implements Comparator<String> {
@Override
public int compare(String s1, String s2) {
return Integer.compare(s1.charAt(0),s2.charAt(0));
}
}
public static class SerializableStringComparator implements Comparator<String>, Serializable {
@Override
public int compare(String s1, String s2) {
return Integer.compare(s1.charAt(0),s2.charAt(0));
}
}
}
Apart from the Spark logging, it outputs :
success
failure
failure
success
failure
The exception thrown in case of failure is always the same :
org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.reflect.InvocationTargetException
sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:483)
org.apache.spark.serializer.SerializationDebugger$ObjectStreamClassMethods$.getObjFieldValues$extension(SerializationDebugger.scala:240)
org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:150)
org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:99)
org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:158)
org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:99)
org.apache.spark.serializer.SerializationDebugger$.find(SerializationDebugger.scala:58)
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:39)
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:835)
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:778)
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:781)
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:780)
scala.collection.immutable.List.foreach(List.scala:318)
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:780)
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:762)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:847)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:778)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:781)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:780)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:780)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:762)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Now I've got my fix : declaring my custom comparator as Serializable
(I checked in the standard library code, the string case insensitive comparator is declared as serializable so that makes sense).
But why ? Why should I not use lambdas here ? I would have expected the second and the last one to work properly since I only used static methods and classes.
What I find especially weird is that I have registered the classes I am trying to serialize to Kryo, and the class I did not register can be trivially serialized with their default associated serializer (Kryo associates FieldSerializer
as the default one for most of the classes). However, Kryo registrator is never executed before the task fails being serialized.
Upvotes: 1
Views: 1225
Reputation: 25950
My question was not clearly expliciting why I was so baffled (about Kryo registering code not being executed), so I edited it to reflect it.
I have figured out that Spark uses two different serializers :
one for serializing the tasks from the master to the slaves, called closureSerializer
in the code (see SparkEnv.scala
). It can only be set to JavaSerializer
at the date of my post.
one for serializing the actual data that is processed, called serializer
in the SparkEnv
. This one can be set either to JavaSerializer
or `KryoSerializer.
Registering a class to Kryo does not ensure you that it will always be serialized with Kryo, it depends on how you use it. As an example, the DAGScheduler
only uses closureSerializer
so no matter how you configure serialization, you will always need to make your objects Java-serializables if they are manipulated by DAGScheduler
at some point (unless Spark enables Kryo serialization for closures in later releases).
Upvotes: 2