user1663386
user1663386

Reputation:

Getting java.io.NotSerializableException while mapping a JavaRDD

Following is the code which is leading to java.io.NotSerializableException when I try to dispatch the job to executors.

    JavaRDD<Row> rddToWrite = dataToWrite.toJavaRDD();
    JavaRDD<String> stringRdd = rddToWrite.map(new Function<Row, String>() {

        /**
         * Serial Version Id
         */
        private static final long serialVersionUID = 6766320395808127072L;

        @Override
        public String call(Row row) throws Exception {
            return row.mkString(dataFormat.getDelimiter());
        }
    });

However, when I do the following, the task is serialized successfully :

JavaRDD<Row> rddToWrite = dataToWrite.toJavaRDD();
List<String> dataList = rddToWrite.collect().stream().parallel()
                           .map(row -> row.mkString(dataFormat.getDelimiter()))
                           .collect(Collectors.<String>toList());
JavaSparkContext javaSparkContext = new JavaSparkContext(sessionContext.getSparkContext());
JavaRDD<String> stringRDD = javaSparkContext.parallelize(dataList);

Can anyone please help me point out what I'm doing wrong here?

Edit: dataFormat is a private member field in the class where the function containing this code is written. It's an object of a class DataFormat which defines two fields, namely, spark dataformat (e.g. "com.databricks.spark.csv") and the delimiter (e.g. "\t").

Upvotes: 1

Views: 1086

Answers (2)

Alexey Romanov
Alexey Romanov

Reputation: 170849

The anonymous class created by new Function ... needs a reference to the enclosing instance, and serializing the function requires serializing the enclosing instance, including dataFormat and all other fields. If that class is not marked as Serializable, or has any non-serializable non-transient fields, it won't work. And even if it does, it silently performs worse than necessary.

Unfortunately, to fully work around this you need to create a named static inner class (or just a separate class), and it can't even be local (because neither anonymous nor local classes in Java can be static):

static class MyFunction extends Function<Row, String> {
    private String delimiter;
    private static final long serialVersionUID = 6766320395808127072L;

    MyFunction(String delimiter) {
        this.delimiter = delimiter;
    }

    @Override
    public String call(Row row) throws Exception {
        return row.mkString(delimiter);
    }
}

And then

JavaRDD<String> stringRdd = rddToWrite.map(new MyFunction(dataFormat.getDelimiter()));

Upvotes: 2

Mo Tao
Mo Tao

Reputation: 1295

When you access dataFormat, it means this.dataFormat. So spark will try to serialize this and encounter the NotSerializableException.

Try make a local copy like:

DataFormat dataformat = this.dataformat;
JavaRDD<Row> rddToWrite = dataToWrite.toJavaRDD();
JavaRDD<String> stringRdd = rddToWrite.map(new Function<Row, String>() ...

For more information, see http://spark.apache.org/docs/latest/programming-guide.html#passing-functions-to-spark

Upvotes: 2

Related Questions