Reputation: 5483
I'm learning spark and my first program is a words counter and I have had problems executing the program in standalone mode
When I execute the code in local mode, I have no problems (setMaster("local")
) but When I try to execute in standalone (with a master-slave) , It doesn't finish (I have run the script ./sbin/start-all.sh). The program runs. but collecting data, a exceptions happens. (in the line counter.collectAsMap()
)
The error is the following:
15/01/30 15:23:54 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, ******): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.f$3 of type org.apache.spark.api.java.function.FlatMapFunction in instance of org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1
and the code is the following:
public class MainTest {
public static void main( String[] args ) throws {
String origin = "originPath";
SparkConf conf = new SparkConf().setAppName("org.sparkexample.WordCount").setMaster("spark://localhost:7077");
JavaSparkContext context = new JavaSparkContext(conf);
JavaRDD<String> file = context.textFile(origin);
JavaRDD<String> words = file.flatMap(s -> Arrays.asList(s.split(" ")));
JavaPairRDD<String, Integer> pairs = words.mapToPair(word ->new Tuple2<>(word,1));
JavaPairRDD<String, Integer> counter = pairs.reduceByKey(( valueA,valueB) ->valueA +valueB);
Map<String,Integer> map = counter.collectAsMap();
map.forEach((key,value)->System.out.println(key+"="+value));
}
}
My machine has Oracle Java 8 installed and I launch the master and one slave with ./sbin/start-all.sh script
Which one can be the error?
UPDATE:
I'm trying a Java 7 Implementation and I have other problem: The exception is:
15/01/30 12:47:21 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, ***): java.lang.ClassNotFoundException: com.c4om.l3p3.sparkTest.MainTest
Maybe this is a configuration problem?
The Java7 Code is the following:
public static void countWordsJava7(String path,Boolean local, Boolean printResult){
SparkConf conf = new SparkConf().setAppName("org.sparkexample.WordCount").setMaster("spark://localhost:7077");
JavaSparkContext context = new JavaSparkContext(conf);
JavaRDD<String> file = context.textFile(path);
JavaRDD<String> words = file.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String s) {
return Arrays.asList(" ".split(s));
}
});
JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});
JavaPairRDD<String, Integer> counter = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
Map<String, Integer> map = counter.collectAsMap();
}
Upvotes: 1
Views: 2535
Reputation: 812
Set jar path on the SparkConf then it works. jar path is the path of jar file after compile by maven or ant.
String master = "spark://localhost:7077";
SparkConf conf = new SparkConf()
.setAppName(WordCountTask.class.getName())
.setMaster(master)
.setJars(new String[]{"/home/user/Projects/spark-test/target/first-example-1.0-SNAPSHOT.jar"})
Hope it help.
Upvotes: 0
Reputation: 3412
I had the same error cannot assign instance of java.lang.invoke.SerializedLambda
if I run code from IDE like Idea, and invoke setJars(new String[]{"/path/to/jar/with/your/class.jar"}) on SparkConf instance, then it worked.
For example:
SparkConf conf = new SparkConf().setAppName("LineCount");
conf.setMaster("spark://localhost:7077")
conf.setJars(new String[] { "/home/path/MainTest.jar" });
Upvotes: 0
Reputation: 27455
ClassNotFoundException
means a class is missing on the workers. There can be different reasons, depending on what class it is complaining about:
It's a class from your own package. You have not deployed the jar file with your code to the workers. You can use SparkConf.setJars
to have Spark distribute your jar for you.
It's a system class. This suggests the workers are running with a different version of Java than the application. For example java.lang.invoke.SerializedLambda
is a Java 8 class — if it's missing, your workers are running Java 7.
It's a library class. You need to install the library on all workers, or use SparkConf.setJars
to have Spark distribute your jar for you.
To debug this sort of problem, print the classpath on the workers.
rdd.foreachPartition { p => println(System.getProperty("java.class.path")) }
Upvotes: 1