Reputation: 597
I'm new to spark, and was trying to run the example JavaSparkPi.java, it runs well, but because i have to use this in another java s I copy all things from main to a method in the class and try to call the method in main, it saids
org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException
the code looks like this:
public class JavaSparkPi {
public void cal(){
JavaSparkContext jsc = new JavaSparkContext("local", "JavaLogQuery");
int slices = 2;
int n = 100000 * slices;
List<Integer> l = new ArrayList<Integer>(n);
for (int i = 0; i < n; i++) {
l.add(i);
}
JavaRDD<Integer> dataSet = jsc.parallelize(l, slices);
System.out.println("count is: "+ dataSet.count());
dataSet.foreach(new VoidFunction<Integer>(){
public void call(Integer i){
System.out.println(i);
}
});
int count = dataSet.map(new Function<Integer, Integer>() {
@Override
public Integer call(Integer integer) throws Exception {
double x = Math.random() * 2 - 1;
double y = Math.random() * 2 - 1;
return (x * x + y * y < 1) ? 1 : 0;
}
}).reduce(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
});
System.out.println("Pi is roughly " + 4.0 * count / n);
}
public static void main(String[] args) throws Exception {
JavaSparkPi myClass = new JavaSparkPi();
myClass.cal();
}
}
anyone have idea on this? thanks!
Upvotes: 12
Views: 25889
Reputation: 1
This error comes because you have multiple physical CPUs in your local or cluster and spark engine try to send this function to multiple CPUs over network. Your function
dataSet.foreach(new VoidFunction<Integer>(){
public void call(Integer i){
***System.out.println(i);***
}
});
uses println() which is not serialize. So the exception thrown by Spark Engine. The solution is you can use below:
dataSet.collect().forEach(new VoidFunction<Integer>(){
public void call(Integer i){
System.out.println(i);
}
});
Upvotes: 0
Reputation: 123
The main problem is that when you create an Anonymous Class in java it is passed a reference of the enclosing class. This can be fixed in many ways
This works in your case but will fall flat in case your enclosing class has some field that is not serializable. I would also say that serializing the parent class is a total waste.
Creating the closure by invoking some static function doesn't pass the reference to the closure and hence no need to make serializable this way.
Upvotes: 2
Reputation: 27455
The nested functions hold a reference to the containing object (JavaSparkPi
). So this object will get serialized. For this to work, it needs to be serializable. Simple to do:
public class JavaSparkPi implements Serializable {
...
Upvotes: 16