Reputation: 525
I am seeing a behavior in Spark ( 2.2.0 ) I do not understand, but guessing it's related to Lambda and Anonymous classes, when trying to extract out a lambda function:
This works:
public class EventsFilter
{
public Dataset< String > filter( Dataset< String > events )
{
return events.filter( ( FilterFunction< String > ) x -> x.length() > 3 );
}
}
Yet this does not:
public class EventsFilter
{
public Dataset< String > filter( Dataset< String > events )
{
FilterFunction< String > filter = new FilterFunction< String >(){
@Override public boolean call( String value ) throws Exception
{
return value.length() > 3;
}
};
return events.filter( filter );
}
}
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) ...
...
Caused by: java.io.NotSerializableException: ...EventsFilter
..Serialization stack:
- object not serializable (class: ...EventsFilter,
value:...EventsFilter@e521067)
- field (class: .EventsFilter$1, name: this$0, type: class ..EventsFilter)
. - object (class ...EventsFilter$1, ..EventsFilter$1@5c70d7f0)
. - element of array (index: 1)
- array (class [Ljava.lang.Object;, size 4)
- field (class:
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, name: references$1, type: class [Ljava.lang.Object;)
- object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, <function2>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
I am testing against:
@Test
public void test()
{
EventsFilter filter = new EventsFilter();
Dataset<String> input = SparkSession.builder().appName( "test" ).master( "local" ).getOrCreate()
.createDataset( Arrays.asList( "123" , "123" , "3211" ) ,
Encoders.kryo( String.class ) );
Dataset<String> res = filter.filter( input );
assertThat( res.count() , is( 1l ) );
}
Even weirder, when put in a static main, both seem to work...
How is defining the function explicitly inside a method causing that sneaky 'this' reference serialization?
Upvotes: 1
Views: 1710
Reputation: 525
I was under the false impression that Lambdas are implemented under the hood as inner classes. This is no longer the case (very helpful talk). Also, as T. Gawęda answered, inner classes do in fact hold reference to the outer class, even if it is not needed (here). This difference explains the behavior.
Upvotes: 1