user2810081
user2810081

Reputation: 597

java+spark: org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException

I'm new to spark, and was trying to run the example JavaSparkPi.java, it runs well, but because i have to use this in another java s I copy all things from main to a method in the class and try to call the method in main, it saids

org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException

the code looks like this:

public class JavaSparkPi {

public void cal(){
    JavaSparkContext jsc = new JavaSparkContext("local", "JavaLogQuery");
    int slices = 2;
    int n = 100000 * slices;

    List<Integer> l = new ArrayList<Integer>(n);
    for (int i = 0; i < n; i++) {
        l.add(i);
    }

    JavaRDD<Integer> dataSet = jsc.parallelize(l, slices);

    System.out.println("count is: "+ dataSet.count());
    dataSet.foreach(new VoidFunction<Integer>(){
        public void call(Integer i){
            System.out.println(i);
        }
    });

    int count = dataSet.map(new Function<Integer, Integer>() {
        @Override
        public Integer call(Integer integer) throws Exception {
            double x = Math.random() * 2 - 1;
            double y = Math.random() * 2 - 1;
            return (x * x + y * y < 1) ? 1 : 0;
        }
    }).reduce(new Function2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer integer, Integer integer2) throws Exception {
            return integer + integer2;
        }
    });

    System.out.println("Pi is roughly " + 4.0 * count / n);
}

public static void main(String[] args) throws Exception {

    JavaSparkPi myClass = new JavaSparkPi();
    myClass.cal();
}
}

anyone have idea on this? thanks!

Upvotes: 12

Views: 25889

Answers (3)

Jeetu
Jeetu

Reputation: 1

This error comes because you have multiple physical CPUs in your local or cluster and spark engine try to send this function to multiple CPUs over network. Your function

 dataSet.foreach(new VoidFunction<Integer>(){
        public void call(Integer i){
            ***System.out.println(i);***
        }
    });

uses println() which is not serialize. So the exception thrown by Spark Engine. The solution is you can use below:

dataSet.collect().forEach(new VoidFunction<Integer>(){
       public void call(Integer i){
         System.out.println(i);
    }
});

Upvotes: 0

Anuj J
Anuj J

Reputation: 123

The main problem is that when you create an Anonymous Class in java it is passed a reference of the enclosing class. This can be fixed in many ways

Declare the enclosing class Serializable

This works in your case but will fall flat in case your enclosing class has some field that is not serializable. I would also say that serializing the parent class is a total waste.

Create the Closure in a static function

Creating the closure by invoking some static function doesn't pass the reference to the closure and hence no need to make serializable this way.

Upvotes: 2

Daniel Darabos
Daniel Darabos

Reputation: 27455

The nested functions hold a reference to the containing object (JavaSparkPi). So this object will get serialized. For this to work, it needs to be serializable. Simple to do:

public class JavaSparkPi implements Serializable {
  ...

Upvotes: 16

Related Questions