Gaurav Ram
Gaurav Ram

Reputation: 1085

Spark + Kafka streaming NoClassDefFoundError kafka/serializer/StringDecoder

I'm trying to send message from my kafka producer and stream it in spark streaming. But I'm getting the following error when I run my application on spark submit.

Error

 Exception in thread "main" java.lang.NoClassDefFoundError: kafka/serializer/StringDecoder
        at com.spark_stream.Main.main(Main.java:37)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: kafka.serializer.StringDecoder
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 10 more

Application code is as follows:

Main.java

package com.spark_stream;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import kafka.serializer.StringDecoder;

public class Main {

    public static void main(String[] args) {
        // TODO Auto-generated method stub

         System.out.println( "spark started!" );

            SparkConf conf = new SparkConf()
                    .setAppName("kafka-sandbox")
                    .setMaster("local[*]");
            JavaSparkContext sc = new JavaSparkContext(conf);
            JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));


            Map<String, String> kafkaParams = new HashMap<String, String>();
            kafkaParams.put("metadata.broker.list", "localhost:9092");
            Set<String> topics = Collections.singleton("speed");

            JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc,
                    String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);

            directKafkaStream.foreachRDD(rdd -> {
                System.out.println("--- New RDD with " + rdd.partitions().size()
                        + " partitions and " + rdd.count() + " records");
                rdd.foreach(record -> System.out.println(record._2));
            });

            System.out.println( "connection completed" );


            ssc.start();

            ssc.awaitTermination();

            System.out.println( "spark ended!" );

    }

}

Pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.spark_stream</groupId>
  <artifactId>com.spark_stream</artifactId>
  <version>0.0.1-SNAPSHOT</version>


    <dependencies>

    <dependency> <!-- Spark dependency -->
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.6.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.6.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka_2.10</artifactId>
        <version>1.6.0</version>
    </dependency>


</dependencies>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>
</project>

Couldn't find a solution for this error. Any help would be appreciated.

Upvotes: 2

Views: 4251

Answers (3)

skjagini
skjagini

Reputation: 3217

This usually happens if you don't bundle all your dependent assemblies that your application needs, try to build a uber which contains all the dependencies.

I have added a portion of the sample pom file which will do the same.

<build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.1.6</version>
                <executions>
                    <execution>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <shadedArtifactAttached>true</shadedArtifactAttached>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <artifactSet>
                                <includes>
                                    <include>*:*</include>
                                </includes>
                            </artifactSet>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

        </plugins>

Upvotes: 0

Ayush Pandey
Ayush Pandey

Reputation: 168

Seems like complier is unable to find kafka jars as you had not included in pom file. Trying adding below dependency in your pom file.Check for kafka version you are using.

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.10 -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.8.0</version>
</dependency>

Upvotes: 2

Adonis
Adonis

Reputation: 4818

Have a look at the doc: http://spark.apache.org/docs/latest/submitting-applications.html#launching-applications-with-spark-submit

More specifically the part:

Path to a bundled jar including your application and all dependencies.

Whereas your pom.xml clearly shows that the jar you are building is without the dependencies. That's why spark-submit cannot find the class kafka.serializer.StringDecoder.

What you might want to use to solve such a problem is a plugin that include your dependencies inside your jar, the maven assembly plugin can help you with this

Upvotes: 2

Related Questions