Andrew Simonds
Andrew Simonds

Reputation: 31

How to resolve Kafka Consumer poll timeout error

I am trying to use Apache Kafka through a vagrant machine to run a simple Kafka Consumer program. The program get's stuck before the for loop when it tries to call the .poll(100) method.

Lot's of digging into deeper classes for debugging but not much has been found.

val TOPIC="testTopic"

val  props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.10:9092")
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

val consumer = new KafkaConsumer[String, String](props)

consumer.subscribe(util.Collections.singletonList(TOPIC))

while(true) {
  println("Test")
  val records = consumer.poll(100)
  for (record <- records.asScala) {
    println(record)
  }
  println("Test2")
}

}

Currently outputs Test and then get's stuck with no error message. It's expected that it will output the contents of the Kafka topic.

Upvotes: 3

Views: 2966

Answers (1)

ruifgmonteiro
ruifgmonteiro

Reputation: 709

You need to upgrade your kafka-clients version to 2.0.0 or above. When the kafka server is down, for example, using the poll method from KafkaConsumer class you will get stuck in the internal loop waiting for the broker to become available again.

According to KIP-266:

ConsumerRecords

poll​(long timeout)

Deprecated. Since 2.0. Use poll(Duration), which does not block beyond the timeout awaiting partition assignment. See KIP-266 for more information.

In your case:

import org.apache.kafka.clients.consumer.KafkaConsumer; 
import scala.concurrent.duration._

// ...
val timeout = Duration(100, MILLISECONDS) 

while(true) {
  println("Test")
  val records = consumer.poll(timeout)
  for (record <- records.asScala) {
    println(record)
  }
  println("Test2")
}

//...

In conclusion, you just need to import the new version of the KafkaConsumer class and pass the timeout parameter to the new poll method as an instance of the Duration object.

Upvotes: 1

Related Questions