Reputation: 311
I am trying to read the data from kafka topic, in Flink streaming. I am trying to run the following example code which is there as an example on page APACHE Flink 1.1.3 Documentation: Apache kafka connector,
import java.util.Properties;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
public class stock_streaming_kafka {
public static void main(String[] args) throws Exception
{
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
FlinkKafkaConsumer09<String> myConsumer = new FlinkKafkaConsumer09<>("nsestocks4k", new SimpleStringSchema(), properties);
DataStream<String> stream = env
.addSource(myConsumer)
.print();
}
}
I have following error:
Exception in thread "main" java.lang.Error: Unresolved compilation problems:
The type org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase cannot be resolved. It is indirectly referenced from required .class files
The method addSource(SourceFunction<OUT>) in the type StreamExecutionEnvironment is not applicable for the arguments (FlinkKafkaConsumer09<String>)
at stock_streaming_kafka.main(stock_streaming_kafka.java:25)
Can you please guide me to fix this? Is there any dependency issue with Kafka connector. My versions are:
Upvotes: 2
Views: 2833
Reputation: 509
Since the answer is not accepted yet, here is a complete Maven code example to read data from Kafka using Flink.
You may need to tweak the pom.xml to match your setup of Kafka and Scala versions.
Hope this helps.
Upvotes: 0
Reputation: 1662
Please use the below versions. It will work with your Kafka version.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.1.4</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.9_2.10</artifactId>
<version>1.1.3</version>
</dependency>
I see a Compilation issue in the code.
Change this:
DataStream<String> stream = env
.addSource(myConsumer)
.print();
to:
DataStream<String> stream = env
.addSource(myConsumer);
stream.print();
If it still doesn't work for you then please let me know and I will share the working code.
Upvotes: 1
Reputation: 18987
The versions of Flink and the Flink connector must match.
Update the flink-connector
dependency to 1.1.3.
Upvotes: 0