kadsank
kadsank

Reputation: 311

Flink with Kafka as a source

I am trying to read the data from kafka topic, in Flink streaming. I am trying to run the following example code which is there as an example on page APACHE Flink 1.1.3 Documentation: Apache kafka connector,

import java.util.Properties;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

public class stock_streaming_kafka {

    public static void main(String[] args) throws Exception
    {
        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
FlinkKafkaConsumer09<String> myConsumer = new FlinkKafkaConsumer09<>("nsestocks4k", new SimpleStringSchema(), properties);

    DataStream<String> stream = env
        .addSource(myConsumer)
        .print();
}

}

I have following error:

Exception in thread "main" java.lang.Error: Unresolved compilation problems: 
The type org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase cannot be resolved. It is indirectly referenced from required .class files
The method addSource(SourceFunction<OUT>) in the type StreamExecutionEnvironment is not applicable for the arguments (FlinkKafkaConsumer09<String>)

at stock_streaming_kafka.main(stock_streaming_kafka.java:25)

Can you please guide me to fix this? Is there any dependency issue with Kafka connector. My versions are:

  1. Flink 1.1.3
  2. Kafka 2.10
  3. flink-connector-kafka-0.9_2.11-1.0.0.jar

Upvotes: 2

Views: 2833

Answers (3)

Tanmay Deshpande
Tanmay Deshpande

Reputation: 509

Since the answer is not accepted yet, here is a complete Maven code example to read data from Kafka using Flink.

You may need to tweak the pom.xml to match your setup of Kafka and Scala versions.

Hope this helps.

Upvotes: 0

Vikram Rawat
Vikram Rawat

Reputation: 1662

Please use the below versions. It will work with your Kafka version.

   <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.1.4</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
            <version>1.1.3</version>
        </dependency>

I see a Compilation issue in the code.

Change this:

DataStream<String> stream = env
        .addSource(myConsumer)
        .print();

to:

DataStream<String> stream = env
        .addSource(myConsumer);
stream.print();

If it still doesn't work for you then please let me know and I will share the working code.

Upvotes: 1

Fabian Hueske
Fabian Hueske

Reputation: 18987

The versions of Flink and the Flink connector must match. Update the flink-connector dependency to 1.1.3.

Upvotes: 0

Related Questions