gonefuture
gonefuture

Reputation: 113

Spark structured streaming can't get Kafka data

I use spark 2.2.1, kafka_2.12-1.0.0 and scala to get some json data from kafka, however, I only connect the kafka but no data output .

here my scala code:

 def main(args: Array[String]) {
    val spark = SparkSession
      .builder()
      .appName("Spark structured streaming Kafka example")
       .master("local[2]")
      .getOrCreate()

    val inputstream = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "behavior")
      .option("group.id","test-consumer-group")
      .option("startingOffsets", "earliest")
      .load()

    import spark.implicits._
    println("===============================================================")
    val query  = inputstream //select($"data")
      .selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
      .writeStream
      .outputMode("append")
      .format("console")
      .trigger( Trigger.ProcessingTime("2 seconds"))
      .start()

    println("===============================================================" +query.isActive)
    query.awaitTermination()

here is my pom.xml

      <properties>
    <spark.version>2.2.0</spark.version>
    <scala.version>2.11.6</scala.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.12</artifactId>
      <version>0.10.2.1</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
      <version>2.2.1</version>
    </dependency>


  </dependencies>

I run this code, the console didn't show any data from kafka .

here the console output:

    ===============================================================
18/03/12 17:00:47 INFO SparkSqlParser: Parsing command: CAST(key AS STRING)
18/03/12 17:00:47 INFO SparkSqlParser: Parsing command: CAST(value AS STRING)
18/03/12 17:00:48 INFO StreamExecution: Starting [id = 6648f18e-3ecd-4046-85ee-932fffaab70c, runId = cb6a9ae9-9460-4232-b8ed-342d48c2e524]. Use /D:/data/kafka to store the query checkpoint.
===============================================================true

    18/03/12 17:00:48 INFO ConsumerConfig: ConsumerConfig values: 
    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [localhost:9092]
    check.crcs = true
    client.id = 
    connections.max.idle.ms = 540000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = spark-kafka-source-1b918ced-93c2-4648-8a60-16f9695d12d6-2063137397-driver-0
    heartbeat.interval.ms = 3000
    interceptor.classes = null
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 1
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.ms = 50
    request.timeout.ms = 305000
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer



Discovered coordinator KB2CMVMCIWDJT61.localdomain:9092 (id: 2147483647 rack: null) for group spark-kafka-source-1b918ced-93c2-4648-8a60-16f9695d12d6-2063137397-driver-0.

Marking the coordinator KB2CMVMCIWDJT61.localdomain:9092 (id: 2147483647 rack: null) dead for group spark-kafka-source-1b918ced-93c2-4648-8a60-16f9695d12d6-2063137397-driver-0

the output only says that my consumer-group is dead. My kafka works well ,I can use the console command to get data from the topic "behavior".In short ,the kafka and topic didn't seem to be wrong. I'm a novice for Spark structured streaming and Kafka, hope to get your help.

Upvotes: 2

Views: 3822

Answers (2)

maverik
maverik

Reputation: 786

You're not supposed to set group.id with structured streaming. Under Kafka specific configurations:

group.id: Kafka source will create a unique group id for each query automatically.

https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html

Upvotes: 2

Deepansh Goyal
Deepansh Goyal

Reputation: 41

The problem is in the Kafka end. Try restarting zookeeper. Does the coordinator dead error is recurring or it comes only once?

If it comes only once then there is connection issue and your spark is not getting connected to Kafka. See if Kafka and zookeeper are properly set up on your localhost. If it is recurring, it means it is getting connected but there is some other issue, in this case, try restarting zookeeper.

Upvotes: 0

Related Questions