Reputation: 113
I use spark 2.2.1, kafka_2.12-1.0.0 and scala to get some json data from kafka, however, I only connect the kafka but no data output .
here my scala code:
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.appName("Spark structured streaming Kafka example")
.master("local[2]")
.getOrCreate()
val inputstream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "behavior")
.option("group.id","test-consumer-group")
.option("startingOffsets", "earliest")
.load()
import spark.implicits._
println("===============================================================")
val query = inputstream //select($"data")
.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
.writeStream
.outputMode("append")
.format("console")
.trigger( Trigger.ProcessingTime("2 seconds"))
.start()
println("===============================================================" +query.isActive)
query.awaitTermination()
here is my pom.xml
<properties>
<spark.version>2.2.0</spark.version>
<scala.version>2.11.6</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>0.10.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.2.1</version>
</dependency>
</dependencies>
I run this code, the console didn't show any data from kafka .
here the console output:
===============================================================
18/03/12 17:00:47 INFO SparkSqlParser: Parsing command: CAST(key AS STRING)
18/03/12 17:00:47 INFO SparkSqlParser: Parsing command: CAST(value AS STRING)
18/03/12 17:00:48 INFO StreamExecution: Starting [id = 6648f18e-3ecd-4046-85ee-932fffaab70c, runId = cb6a9ae9-9460-4232-b8ed-342d48c2e524]. Use /D:/data/kafka to store the query checkpoint.
===============================================================true
18/03/12 17:00:48 INFO ConsumerConfig: ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = spark-kafka-source-1b918ced-93c2-4648-8a60-16f9695d12d6-2063137397-driver-0
heartbeat.interval.ms = 3000
interceptor.classes = null
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 1
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
Discovered coordinator KB2CMVMCIWDJT61.localdomain:9092 (id: 2147483647 rack: null) for group spark-kafka-source-1b918ced-93c2-4648-8a60-16f9695d12d6-2063137397-driver-0.
Marking the coordinator KB2CMVMCIWDJT61.localdomain:9092 (id: 2147483647 rack: null) dead for group spark-kafka-source-1b918ced-93c2-4648-8a60-16f9695d12d6-2063137397-driver-0
the output only says that my consumer-group is dead. My kafka works well ,I can use the console command to get data from the topic "behavior".In short ,the kafka and topic didn't seem to be wrong. I'm a novice for Spark structured streaming and Kafka, hope to get your help.
Upvotes: 2
Views: 3822
Reputation: 786
You're not supposed to set group.id with structured streaming. Under Kafka specific configurations:
group.id: Kafka source will create a unique group id for each query automatically.
https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html
Upvotes: 2
Reputation: 41
The problem is in the Kafka end. Try restarting zookeeper. Does the coordinator dead error is recurring or it comes only once?
If it comes only once then there is connection issue and your spark is not getting connected to Kafka. See if Kafka and zookeeper are properly set up on your localhost. If it is recurring, it means it is getting connected but there is some other issue, in this case, try restarting zookeeper.
Upvotes: 0