Consume no message from spark-streaming-kafka-0-10 with option kafka.bootstrap.servers

I'm using kafka 1.0.1-kafka-3.1.0-SNAPSHOT from CDH (cloudera distribution for hadoop)

On my batch-1 edge server, I can produce messages with :

kafka-console-producer --broker-list batch-1:9092 --topic MyTopic

I can consume messages thanks to Zookeeper on my first node with :

kafka-console-consumer --zookeeper data1:2181 --topic MyTopic --from-beginning

But I get nothing with bootstrap-server option :

kafka-console-consumer --bootstrap-server batch-1:9092 --topic MyTopic --from-beginning

The problem is i'm using kafka on spark :

libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.3.0"

val df = spark.readStream
  .format("org.apache.spark.sql.kafka010.KafkaSourceProvider")
  .option("kafka.bootstrap.servers", "batch-1:9092")
  .option("subscribe", "MyTopic")
  .load()

println("Select :")

val df2 = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "CAST(topic AS STRING)")
  .as[(String, String, String)]

println("Show :")

val query = df2.writeStream
  .outputMode("append")
  .format("console")
  .start()

query.awaitTermination()

I did an export SPARK_KAFKA_VERSION=0.10 on my edge. Then

spark2-submit --driver-memory 2G --jars spark-sql-kafka-0-10_2.11-2.3.0.cloudera4.jar --class "spark.streaming.Poc" poc_spark_kafka_2.11-0.0.1.jar

This force me to use kafka.bootstrap.servers, it seems to be connected, but I can't get any message.

The output is the same as the kafka-console-consumer with --bootstrap-server option :

18/10/30 16:11:48 INFO utils.AppInfoParser: Kafka version : 0.10.0-kafka-2.1.0
18/10/30 16:11:48 INFO utils.AppInfoParser: Kafka commitId : unknown
18/10/30 16:11:48 INFO streaming.MicroBatchExecution: Starting new streaming query.

Then, nothing. Should I connect to Zookeeper ? How ?

Is there a version conflict whereas they said "Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)" here : https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html ?

What did I missed ?

Upvotes: 1

Views: 1086

Answers (1)

SOLUTION

The /var/log/kafka/kafka-broker-batch-1.log said :

2018-10-31 13:40:08,284 ERROR kafka.server.KafkaApis: [KafkaApi-51] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet.

So I deployed 3 brokers on my cluster nodes with a gateway on the edge, now it works with :

kafka-console-producer --broker-list data1:9092,data2:9092,data3:9092 --topic Test

kafka-console-consumer --bootstrap-server data1:9092 --topic Test --from-beginning

Spark works fine too.

Upvotes: 2

Related Questions