NateH06
NateH06

Reputation: 3574

Can produce to Kafka but cannot consume

I'm using the Kafka JDK client ver 0.10.2.1 . I am able to produce simple messages to Kafka for a "heartbeat" test, but I cannot consume a message from that same topic using the sdk. I am able to consume that message when I go into the Kafka CLI, so I have confirmed the message is there. Here's the function I'm using to consume from my Kafka server, with the props - I pass the message I produced to the topic only after I have indeed confirmed the produce() was succesful, I can post that function later if requested:

private def consumeFromKafka(topic: String, expectedMessage: String): Boolean = {
    val props: Properties = initProps("consumer")
    val consumer = new KafkaConsumer[String, String](props)
    consumer.subscribe(List(topic).asJava)
    var readExpectedRecord = false
    try {
      val records = {
        val firstPollRecs = consumer.poll(MAX_POLLTIME_MS)
        // increase timeout and try again if nothing comes back the first time in case system is busy
        if (firstPollRecs.count() == 0) firstPollRecs else {
          logger.info("KafkaHeartBeat: First poll had 0 records- trying again - doubling timeout to "
            + (MAX_POLLTIME_MS * 2)/1000 + " sec.")
          consumer.poll(MAX_POLLTIME_MS * 2)
        }
      }
      records.forEach(rec => {
        if (rec.value() == expectedMessage) readExpectedRecord = true
      })
    } catch {
      case e: Throwable => //log error
    } finally {
      consumer.close()
    }
    readExpectedRecord
  }

private def initProps(propsType: String): Properties = {
    val prop = new Properties()
    prop.put("bootstrap.servers", kafkaServer + ":" + kafkaPort)

    propsType match {
      case "producer" => {
        prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        prop.put("acks", "1")
        prop.put("producer.type", "sync")
        prop.put("retries", "3")
        prop.put("linger.ms", "5")
      }
      case "consumer" => {
        prop.put("group.id", groupId)
        prop.put("enable.auto.commit", "false")
        prop.put("auto.commit.interval.ms", "1000")
        prop.put("session.timeout.ms", "30000")
        prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
        // poll just once, should only be one record for the heartbeat
        prop.put("max.poll.records", "1")
      }
    }
    prop
  }

Now when I run the code, here's what it outputs in the console:

13:04:21 - Discovered coordinator serverName:9092 (id: 2147483647

rack: null) for group 0b8947e1-eb68-4af3-ac7b-be3f7c02e76e. 13:04:23

INFO o.a.k.c.c.i.ConsumerCoordinator - Revoking previously assigned

partitions [] for group 0b8947e1-eb68-4af3-ac7b-be3f7c02e76e 13:04:24

INFO o.a.k.c.c.i.AbstractCoordinator - (Re-)joining group

0b8947e1-eb68-4af3-ac7b-be3f7c02e76e 13:04:25 INFO

o.a.k.c.c.i.AbstractCoordinator - Successfully joined group

0b8947e1-eb68-4af3-ac7b-be3f7c02e76e with generation 1 13:04:26 INFO

o.a.k.c.c.i.ConsumerCoordinator - Setting newly assigned partitions

[HeartBeat_Topic.Service_5.2018-08-03.13_04_10.377-0] for group

0b8947e1-eb68-4af3-ac7b-be3f7c02e76e 13:04:27 INFO

c.p.p.l.util.KafkaHeartBeatUtil - KafkaHeartBeat: First poll had 0

records- trying again - doubling timeout to 60 sec.

And then nothing else, no errors thrown -so no records are polled. Does anyone have any idea what's preventing the 'consume' from happening? The subscriber seems to be successful, as I'm able to successfully call the listTopics and list partions no problem.

Upvotes: 0

Views: 417

Answers (1)

mjuarez
mjuarez

Reputation: 16824

Your code has a bug. It seems your line:

 if (firstPollRecs.count() == 0) 

Should say this instead

 if (firstPollRecs.count() > 0) 

Otherwise, you're passing in an empty firstPollRecs, and then iterating over that, which obviously returns nothing.

Upvotes: 1

Related Questions