Reputation: 85
I'm using kafka 1.0.1-kafka-3.1.0-SNAPSHOT
from CDH (cloudera distribution for hadoop)
On my batch-1 edge server, I can produce messages with :
kafka-console-producer --broker-list batch-1:9092 --topic MyTopic
I can consume messages thanks to Zookeeper on my first node with :
kafka-console-consumer --zookeeper data1:2181 --topic MyTopic --from-beginning
But I get nothing with bootstrap-server option :
kafka-console-consumer --bootstrap-server batch-1:9092 --topic MyTopic --from-beginning
The problem is i'm using kafka on spark :
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.3.0"
val df = spark.readStream
.format("org.apache.spark.sql.kafka010.KafkaSourceProvider")
.option("kafka.bootstrap.servers", "batch-1:9092")
.option("subscribe", "MyTopic")
.load()
println("Select :")
val df2 = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "CAST(topic AS STRING)")
.as[(String, String, String)]
println("Show :")
val query = df2.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
I did an export SPARK_KAFKA_VERSION=0.10
on my edge. Then
spark2-submit --driver-memory 2G --jars spark-sql-kafka-0-10_2.11-2.3.0.cloudera4.jar --class "spark.streaming.Poc" poc_spark_kafka_2.11-0.0.1.jar
This force me to use kafka.bootstrap.servers
, it seems to be connected, but I can't get any message.
The output is the same as the kafka-console-consumer
with --bootstrap-server
option :
18/10/30 16:11:48 INFO utils.AppInfoParser: Kafka version : 0.10.0-kafka-2.1.0
18/10/30 16:11:48 INFO utils.AppInfoParser: Kafka commitId : unknown
18/10/30 16:11:48 INFO streaming.MicroBatchExecution: Starting new streaming query.
Then, nothing. Should I connect to Zookeeper ? How ?
Is there a version conflict whereas they said "Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)" here : https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html ?
What did I missed ?
Upvotes: 1
Views: 1086
Reputation: 85
SOLUTION
The /var/log/kafka/kafka-broker-batch-1.log
said :
2018-10-31 13:40:08,284 ERROR kafka.server.KafkaApis: [KafkaApi-51] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet.
So I deployed 3 brokers on my cluster nodes with a gateway on the edge, now it works with :
kafka-console-producer --broker-list data1:9092,data2:9092,data3:9092 --topic Test
kafka-console-consumer --bootstrap-server data1:9092 --topic Test --from-beginning
Spark works fine too.
Upvotes: 2