raviabhiram
raviabhiram

Reputation: 720

NoClassDefFoundError when running flink with kafka connector

I am trying to stream data from kafka using flink. My code compiles without error but on running I get the following error:

Error: A JNI error has occurred, please check your installation and try again
Exception in thread "main" java.lang.NoClassDefFoundError: 
    org/apache/flink/streaming/util/serialization/DeserializationSchema
    at java.lang.Class.getDeclaredMethods0(Native Method)
    at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
    at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
    at java.lang.Class.getMethod0(Class.java:3018)
    at java.lang.Class.getMethod(Class.java:1784)
    at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:544)
    at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:526)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.util.serialization.DeserializationSchema
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 7 more  

My POM dependency list is as follows:

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-core</artifactId>
            <version>0.9.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>0.10.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.9_2.11</artifactId>
            <version>1.3.2</version>
        </dependency>
        <dependency>
            <groupId>com.googlecode.json-simple</groupId>
            <artifactId>json-simple</artifactId>
            <version>1.1</version>
        </dependency>  
    </dependencies>

The java code that I am trying to run just subscribes to a kafka topic called 'streamer':

import java.util.Properties;
import java.util.Arrays;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;

public class StreamConsumer {
public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "samplegroup");

        DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer09<String>("streamer", new SimpleStringSchema(), properties));

        messageStream.rebalance().map(new MapFunction<String, String>() {
                        private static final long serialVersionUID = -6867736771747690202L;
                        @Override
                        public String map(String value) throws Exception {
                                return "Streamed data: " + value;
                        }
                }).print();
        env.execute();
}
}

System information:
1. Kafka version: 0.9.0.1
2. Flink version: 1.3.2
3. OpenJDK version: 1.8

Although I am using maven, I do not think this is any maven issue because I get the same error even when I try without maven. I manually downloaded all the necessary .jar files to a folder and specified that folder path with the -cp option while compiling with javac. I get the same error as above during runtime but no errors during compile time.

Upvotes: 5

Views: 17043

Answers (5)

pstiegele
pstiegele

Reputation: 156

The cause can also be that if the job is started via Intellij, the dependencies with "provided" are not included in the classpath. This is a common problem.

Go to the Run / Debug Configuration in Intellij, click on "Modify options" and select "Add dependencies with 'provided' scope to classpath".

Option that should be checked

Upvotes: 4

raviabhiram
raviabhiram

Reputation: 720

I figured out the reason and it seems like a really silly error now. In my case the jar packages weren't available at run time. I ended up not using maven at all. I compiled with javac -cp <path_to_jar_files> and executed again with java -cp <path_to_jar_files>

Upvotes: 1

rishi007bansod
rishi007bansod

Reputation: 1469

Try removing scope if you have specified it some where in pom file, as it restricts scope of class files during run-time

Limit Scope in POM file

Upvotes: 3

abalcerek
abalcerek

Reputation: 1819

It looks like the first problem with your pom is that you use different version for your flink imports. Try to use newer version 1.3.2 for all flink modules. This error often occurs when you use incompatible or multiple versions of libraries.

Try using fallowing dependencies (assuming you are using scala 2.11):

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.3.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>1.3.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.11</artifactId>
        <version>1.3.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.9_2.11</artifactId>
        <version>1.3.2</version>
    </dependency>
    <dependency>
        <groupId>com.googlecode.json-simple</groupId>
        <artifactId>json-simple</artifactId>
        <version>1.1</version>
    </dependency>  
</dependencies>

If you still have the same problem post example code so I can reproduce the error.

Upvotes: 1

user4856296
user4856296

Reputation:

NoClassDefFoundError when running flink with kafka connector

Your code code compiles and You are getting NoClassDefFoundError, I think one of your dependency library missing it's Compile Dependencies or Runtime Dependencies during auto download process by maven .pom

So probably It's the root causes You getting NoClassDefFoundError

Solution: clean and build

Upvotes: 0

Related Questions