fhuertas
fhuertas

Reputation: 5483

Spark fails with ClassNotFoundException for SerializedLambda

I'm learning spark and my first program is a words counter and I have had problems executing the program in standalone mode

When I execute the code in local mode, I have no problems (setMaster("local")) but When I try to execute in standalone (with a master-slave) , It doesn't finish (I have run the script ./sbin/start-all.sh). The program runs. but collecting data, a exceptions happens. (in the line counter.collectAsMap() )

The error is the following:

15/01/30 15:23:54 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, ******): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.f$3 of type org.apache.spark.api.java.function.FlatMapFunction in instance of org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1

and the code is the following:

public class MainTest {
    public static void main( String[] args ) throws {
        String origin = "originPath";

        SparkConf conf = new SparkConf().setAppName("org.sparkexample.WordCount").setMaster("spark://localhost:7077");
        JavaSparkContext context = new JavaSparkContext(conf);
        JavaRDD<String> file = context.textFile(origin);
        JavaRDD<String> words = file.flatMap(s -> Arrays.asList(s.split(" ")));
        JavaPairRDD<String, Integer> pairs = words.mapToPair(word ->new Tuple2<>(word,1));
        JavaPairRDD<String, Integer> counter = pairs.reduceByKey(( valueA,valueB) ->valueA +valueB);
        Map<String,Integer> map = counter.collectAsMap();
        map.forEach((key,value)->System.out.println(key+"="+value));
    }
}

My machine has Oracle Java 8 installed and I launch the master and one slave with ./sbin/start-all.sh script

Which one can be the error?

UPDATE:

I'm trying a Java 7 Implementation and I have other problem: The exception is:

15/01/30 12:47:21 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, ***): java.lang.ClassNotFoundException: com.c4om.l3p3.sparkTest.MainTest

Maybe this is a configuration problem?

The Java7 Code is the following:

public static void countWordsJava7(String path,Boolean local, Boolean printResult){
    SparkConf conf = new SparkConf().setAppName("org.sparkexample.WordCount").setMaster("spark://localhost:7077");
    JavaSparkContext context = new JavaSparkContext(conf);
    JavaRDD<String> file = context.textFile(path);
    JavaRDD<String> words = file.flatMap(new FlatMapFunction<String, String>() {
        @Override
        public Iterable<String> call(String s) {
            return Arrays.asList(" ".split(s));
        }
    });
    JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
        @Override
        public Tuple2<String, Integer> call(String s) {
            return new Tuple2<String, Integer>(s, 1);
        }
    });
    JavaPairRDD<String, Integer> counter = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer i1, Integer i2) {
            return i1 + i2;
        }
    });
    Map<String, Integer> map = counter.collectAsMap();
}

Upvotes: 1

Views: 2535

Answers (3)

VanThaoNguyen
VanThaoNguyen

Reputation: 812

Set jar path on the SparkConf then it works. jar path is the path of jar file after compile by maven or ant.

String master = "spark://localhost:7077";
    SparkConf conf = new SparkConf()
            .setAppName(WordCountTask.class.getName())
            .setMaster(master)
            .setJars(new String[]{"/home/user/Projects/spark-test/target/first-example-1.0-SNAPSHOT.jar"})

Hope it help.

Upvotes: 0

Green Lei
Green Lei

Reputation: 3412

I had the same error cannot assign instance of java.lang.invoke.SerializedLambda if I run code from IDE like Idea, and invoke setJars(new String[]{"/path/to/jar/with/your/class.jar"}) on SparkConf instance, then it worked.

For example:

SparkConf conf = new SparkConf().setAppName("LineCount");
conf.setMaster("spark://localhost:7077")
conf.setJars(new String[] { "/home/path/MainTest.jar" });

Upvotes: 0

Daniel Darabos
Daniel Darabos

Reputation: 27455

ClassNotFoundException means a class is missing on the workers. There can be different reasons, depending on what class it is complaining about:

  • It's a class from your own package. You have not deployed the jar file with your code to the workers. You can use SparkConf.setJars to have Spark distribute your jar for you.

  • It's a system class. This suggests the workers are running with a different version of Java than the application. For example java.lang.invoke.SerializedLambda is a Java 8 class — if it's missing, your workers are running Java 7.

  • It's a library class. You need to install the library on all workers, or use SparkConf.setJars to have Spark distribute your jar for you.

To debug this sort of problem, print the classpath on the workers.

rdd.foreachPartition { p => println(System.getProperty("java.class.path")) }

Upvotes: 1

Related Questions