Reputation: 720
I am trying to stream data from kafka using flink. My code compiles without error but on running I get the following error:
Error: A JNI error has occurred, please check your installation and try again
Exception in thread "main" java.lang.NoClassDefFoundError:
org/apache/flink/streaming/util/serialization/DeserializationSchema
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
at java.lang.Class.getMethod0(Class.java:3018)
at java.lang.Class.getMethod(Class.java:1784)
at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:544)
at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:526)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.util.serialization.DeserializationSchema
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 7 more
My POM dependency list is as follows:
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-core</artifactId>
<version>0.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>0.10.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.9_2.11</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1</version>
</dependency>
</dependencies>
The java code that I am trying to run just subscribes to a kafka topic called 'streamer':
import java.util.Properties;
import java.util.Arrays;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
public class StreamConsumer {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "samplegroup");
DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer09<String>("streamer", new SimpleStringSchema(), properties));
messageStream.rebalance().map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;
@Override
public String map(String value) throws Exception {
return "Streamed data: " + value;
}
}).print();
env.execute();
}
}
System information:
1. Kafka version: 0.9.0.1
2. Flink version: 1.3.2
3. OpenJDK version: 1.8
Although I am using maven, I do not think this is any maven issue because I get the same error even when I try without maven. I manually downloaded all the necessary .jar files to a folder and specified that folder path with the -cp option while compiling with javac. I get the same error as above during runtime but no errors during compile time.
Upvotes: 5
Views: 17043
Reputation: 156
The cause can also be that if the job is started via Intellij, the dependencies with "provided" are not included in the classpath. This is a common problem.
Go to the Run / Debug Configuration in Intellij, click on "Modify options" and select "Add dependencies with 'provided' scope to classpath".
Upvotes: 4
Reputation: 720
I figured out the reason and it seems like a really silly error now. In my case the jar packages weren't available at run time. I ended up not using maven at all. I compiled with javac -cp <path_to_jar_files>
and executed again with java -cp <path_to_jar_files>
Upvotes: 1
Reputation: 1469
Try removing scope if you have specified it some where in pom file, as it restricts scope of class files during run-time
Upvotes: 3
Reputation: 1819
It looks like the first problem with your pom is that you use different version for your flink imports. Try to use newer version 1.3.2
for all flink modules. This error often occurs when you use incompatible or multiple versions of libraries.
Try using fallowing dependencies (assuming you are using scala 2.11):
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.9_2.11</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1</version>
</dependency>
</dependencies>
If you still have the same problem post example code so I can reproduce the error.
Upvotes: 1
Reputation:
NoClassDefFoundError when running flink with kafka connector
Your code code compiles and You are getting NoClassDefFoundError, I think one of your dependency library missing it's Compile Dependencies or Runtime Dependencies during auto download process by maven .pom
So probably It's the root causes You getting NoClassDefFoundError
Solution: clean and build
Upvotes: 0