Reputation:
Following is the code which is leading to java.io.NotSerializableException when I try to dispatch the job to executors.
JavaRDD<Row> rddToWrite = dataToWrite.toJavaRDD();
JavaRDD<String> stringRdd = rddToWrite.map(new Function<Row, String>() {
/**
* Serial Version Id
*/
private static final long serialVersionUID = 6766320395808127072L;
@Override
public String call(Row row) throws Exception {
return row.mkString(dataFormat.getDelimiter());
}
});
However, when I do the following, the task is serialized successfully :
JavaRDD<Row> rddToWrite = dataToWrite.toJavaRDD();
List<String> dataList = rddToWrite.collect().stream().parallel()
.map(row -> row.mkString(dataFormat.getDelimiter()))
.collect(Collectors.<String>toList());
JavaSparkContext javaSparkContext = new JavaSparkContext(sessionContext.getSparkContext());
JavaRDD<String> stringRDD = javaSparkContext.parallelize(dataList);
Can anyone please help me point out what I'm doing wrong here?
Edit: dataFormat is a private member field in the class where the function containing this code is written. It's an object of a class DataFormat which defines two fields, namely, spark dataformat (e.g. "com.databricks.spark.csv") and the delimiter (e.g. "\t").
Upvotes: 1
Views: 1086
Reputation: 170849
The anonymous class created by new Function ...
needs a reference to the enclosing instance, and serializing the function requires serializing the enclosing instance, including dataFormat
and all other fields. If that class is not marked as Serializable
, or has any non-serializable non-transient
fields, it won't work. And even if it does, it silently performs worse than necessary.
Unfortunately, to fully work around this you need to create a named static inner class (or just a separate class), and it can't even be local (because neither anonymous nor local classes in Java can be static):
static class MyFunction extends Function<Row, String> {
private String delimiter;
private static final long serialVersionUID = 6766320395808127072L;
MyFunction(String delimiter) {
this.delimiter = delimiter;
}
@Override
public String call(Row row) throws Exception {
return row.mkString(delimiter);
}
}
And then
JavaRDD<String> stringRdd = rddToWrite.map(new MyFunction(dataFormat.getDelimiter()));
Upvotes: 2
Reputation: 1295
When you access dataFormat
, it means this.dataFormat
.
So spark will try to serialize this
and encounter the NotSerializableException
.
Try make a local copy like:
DataFormat dataformat = this.dataformat;
JavaRDD<Row> rddToWrite = dataToWrite.toJavaRDD();
JavaRDD<String> stringRdd = rddToWrite.map(new Function<Row, String>() ...
For more information, see http://spark.apache.org/docs/latest/programming-guide.html#passing-functions-to-spark
Upvotes: 2