Reputation: 31
I am trying to use Apache Kafka through a vagrant machine to run a simple Kafka Consumer program. The program get's stuck before the for loop when it tries to call the .poll(100) method.
Lot's of digging into deeper classes for debugging but not much has been found.
val TOPIC="testTopic"
val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.10:9092")
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(util.Collections.singletonList(TOPIC))
while(true) {
println("Test")
val records = consumer.poll(100)
for (record <- records.asScala) {
println(record)
}
println("Test2")
}
}
Currently outputs Test and then get's stuck with no error message. It's expected that it will output the contents of the Kafka topic.
Upvotes: 3
Views: 2966
Reputation: 709
You need to upgrade your kafka-clients version to 2.0.0 or above. When the kafka server is down, for example, using the poll method from KafkaConsumer
class you will get stuck in the internal loop waiting for the broker to become available again.
According to KIP-266:
ConsumerRecords
poll(long timeout)
Deprecated. Since 2.0. Use poll(Duration), which does not block beyond the timeout awaiting partition assignment. See KIP-266 for more information.
In your case:
import org.apache.kafka.clients.consumer.KafkaConsumer;
import scala.concurrent.duration._
// ...
val timeout = Duration(100, MILLISECONDS)
while(true) {
println("Test")
val records = consumer.poll(timeout)
for (record <- records.asScala) {
println(record)
}
println("Test2")
}
//...
In conclusion, you just need to import the new version of the KafkaConsumer
class and pass the timeout parameter to the new poll method as an instance of the Duration
object.
Upvotes: 1