Reputation: 855
I'm trying to run the following simple Spark code:
Gson gson = new Gson();
JavaRDD<String> stringRdd = jsc.textFile("src/main/resources/META-INF/data/supplier.json");
JavaRDD<SupplierDTO> rdd = stringRdd.map(new Function<String, SupplierDTO>()
{
private static final long serialVersionUID = -78238876849074973L;
@Override
public SupplierDTO call(String str) throws Exception
{
return gson.fromJson(str, SupplierDTO.class);
}
});
But it's throwing the following error while executing the stringRdd.map
statement:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1478)
at org.apache.spark.rdd.RDD.map(RDD.scala:288)
at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:78)
at org.apache.spark.api.java.JavaRDD.map(JavaRDD.scala:32)
at com.demo.spark.processor.cassandra.CassandraDataUploader.uploadData(CassandraDataUploader.java:71)
at com.demo.spark.processor.cassandra.CassandraDataUploader.main(CassandraDataUploader.java:47)
Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 7 more
Here 'jsc' is the JavaSparkContext
object I'm using.
As far as I know, JavaSparkContext
is not a Serializable
object and one should not use it inside any functions which will be sent to the Spark workers.
Now, what I'm not able to understand is, how the instance of JavaSparkContext
is being sent to the workers? What should I change in my code to avoid such scenario?
Upvotes: 5
Views: 5059
Reputation: 382
You can use below code instead of line 9.(return gson.fromJson(str, SupplierDTO.class);
)
return new Gson().fromJson(str, SupplierDTO.class);//this is correct
and remove line 1.(Gson gson = new Gson();
)
Upvotes: 0
Reputation: 2924
For me I resolved this problem using one of the following choices:
transient
static Gson gson = new Gson();
Please refer to the doc Job aborted due to stage failure: Task not serializable
to see other available choices to resolve this probleme
Upvotes: 4
Reputation: 37435
The gson
reference is 'pulling' the outer class into the scope of the closure, taking its full object graph with it.
In this case, create the gson object within the closure:
public SupplierDTO call(String str) throws Exception {
Gson gson = Gson();
return gson.fromJson(str, SupplierDTO.class);
}
You can also declare the spark context transient
If the creation of the Gson instance is costly, consider using mapPartitions
instead of map
.
Upvotes: 7