Joe
Joe

Reputation: 33

Does Spark streaming support to Kafka 1.1.0 now?

Now the spark version is 2.3. I have seen the maven central repository: https://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.apache.spark%22

the shown jar is spark-streaming-kafka-0-10_2.11

so the kafka1.1.0 isn't supported now?

i still should install kafka 0.10.x

Upvotes: 3

Views: 1313

Answers (2)

Joe
Joe

Reputation: 33

I have been test spark2.3 with kafka1.1.0 using jars

<dependency>
     <groupId>org.apache.spark</groupId>
     <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
     <version>${spark.version}</version>
</dependency>

that runs well.

example code:

    SparkConf conf = new SparkConf().setAppName("stream test").setMaster("local[*]");
    JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Durations.seconds(2));

    Map<String, Object> kafkaParams = new HashMap<>();
    kafkaParams.put("bootstrap.servers", "master:9092");
    kafkaParams.put("key.deserializer", StringDeserializer.class);
    kafkaParams.put("value.deserializer", StringDeserializer.class);
    kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
    kafkaParams.put("enable.auto.commit", false);

    List<String> topics = Arrays.asList("A29");

    JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
            streamingContext,
            LocationStrategies.PreferConsistent(),
            ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
    );

    JavaDStream<String> lines = stream.map(ConsumerRecord::value);

    lines.print(30);

    streamingContext.start();
    streamingContext.awaitTermination();

Upvotes: 0

Lana Nova
Lana Nova

Reputation: 360

Based on the following link: you should use spark-streaming-kafka-0-10 for kafka 0.10.0 or higher.

Upvotes: 1

Related Questions