harel
harel

Reputation: 525

Spark Java API Task not serializable when not using Lambda

I am seeing a behavior in Spark ( 2.2.0 ) I do not understand, but guessing it's related to Lambda and Anonymous classes, when trying to extract out a lambda function:

This works:

public class EventsFilter
{
    public Dataset< String > filter( Dataset< String > events )
    {
        return events.filter( ( FilterFunction< String > ) x -> x.length() > 3 );
    }
}

Yet this does not:

public class EventsFilter
{
    public Dataset< String > filter( Dataset< String > events )
    {
        FilterFunction< String > filter = new FilterFunction< String >(){
            @Override public boolean call( String value ) throws Exception
            {
                return value.length() > 3;
            }
        };
        return events.filter( filter );
    }
}

org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) ...
...
Caused by: java.io.NotSerializableException: ...EventsFilter
   ..Serialization stack:
- object not serializable (class: ...EventsFilter, 
value:...EventsFilter@e521067)
    - field (class: .EventsFilter$1, name: this$0, type: class ..EventsFilter)
.   - object (class ...EventsFilter$1, ..EventsFilter$1@5c70d7f0)
.   - element of array (index: 1)
    - array (class [Ljava.lang.Object;, size 4)
    - field (class: 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, name: references$1, type: class [Ljava.lang.Object;)
    - object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, <function2>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)

I am testing against:

@Test
public void test()
{
    EventsFilter filter = new EventsFilter();
    Dataset<String> input = SparkSession.builder().appName( "test" ).master( "local" ).getOrCreate()
            .createDataset( Arrays.asList( "123" , "123"  , "3211" ) ,
                Encoders.kryo( String.class ) );

    Dataset<String> res = filter.filter( input );
    assertThat( res.count() , is( 1l ) );
}

Even weirder, when put in a static main, both seem to work...

How is defining the function explicitly inside a method causing that sneaky 'this' reference serialization?

Upvotes: 1

Views: 1710

Answers (2)

harel
harel

Reputation: 525

I was under the false impression that Lambdas are implemented under the hood as inner classes. This is no longer the case (very helpful talk). Also, as T. Gawęda answered, inner classes do in fact hold reference to the outer class, even if it is not needed (here). This difference explains the behavior.

Upvotes: 1

T. Gawęda
T. Gawęda

Reputation: 16076

Java's inner classes holds reference to outer class. Your outer class is not serializable, so exception is thrown.

Lambdas does not hold reference if that reference is not used, so there's no problem with non-serializable outer class. More here

Upvotes: 3

Related Questions