Arka Ghosh
Arka Ghosh

Reputation: 855

org.apache.spark.SparkException: Task not serializable - JavaSparkContext

I'm trying to run the following simple Spark code:

Gson gson = new Gson();
JavaRDD<String> stringRdd = jsc.textFile("src/main/resources/META-INF/data/supplier.json");

JavaRDD<SupplierDTO> rdd = stringRdd.map(new Function<String, SupplierDTO>()
{
    private static final long serialVersionUID = -78238876849074973L;

    @Override
    public SupplierDTO call(String str) throws Exception
    {
        return gson.fromJson(str, SupplierDTO.class);
    }
});

But it's throwing the following error while executing the stringRdd.map statement:

org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1478)
at org.apache.spark.rdd.RDD.map(RDD.scala:288)
at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:78)
at org.apache.spark.api.java.JavaRDD.map(JavaRDD.scala:32)
at com.demo.spark.processor.cassandra.CassandraDataUploader.uploadData(CassandraDataUploader.java:71)
at com.demo.spark.processor.cassandra.CassandraDataUploader.main(CassandraDataUploader.java:47)
Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 7 more

Here 'jsc' is the JavaSparkContext object I'm using. As far as I know, JavaSparkContext is not a Serializable object and one should not use it inside any functions which will be sent to the Spark workers.

Now, what I'm not able to understand is, how the instance of JavaSparkContext is being sent to the workers? What should I change in my code to avoid such scenario?

Upvotes: 5

Views: 5059

Answers (3)

mrr
mrr

Reputation: 382

You can use below code instead of line 9.(return gson.fromJson(str, SupplierDTO.class);)

return new Gson().fromJson(str, SupplierDTO.class);//this is correct

and remove line 1.(Gson gson = new Gson();)

Upvotes: 0

user1314742
user1314742

Reputation: 2924

For me I resolved this problem using one of the following choices:

  1. As mentioned above, by declaring SparkContext as transient
  2. You could also try to make the object gson static static Gson gson = new Gson();

Please refer to the doc Job aborted due to stage failure: Task not serializable

to see other available choices to resolve this probleme

Upvotes: 4

maasg
maasg

Reputation: 37435

The gson reference is 'pulling' the outer class into the scope of the closure, taking its full object graph with it.

In this case, create the gson object within the closure:

public SupplierDTO call(String str) throws Exception {   
   Gson gson = Gson();
   return gson.fromJson(str, SupplierDTO.class);
}

You can also declare the spark context transient

If the creation of the Gson instance is costly, consider using mapPartitions instead of map.

Upvotes: 7

Related Questions