Reputation: 81
if I run the kafka
consumer inside a thread without manipulating it from outside, like put the consumer on sleep or wake him up, it is necessary to handle the WakeupException
properly? And what is a good approach to handle it?
The consumer is running on an webservice for pulling constantly data out of a queue and should never stop doing it. Furthermore the service has no idle or suspend state. In the documentation of Kafka it is pointed out that the exception is only thrown when the kafka consumer is blocked by another thread, but that will never happen. https://kafka.apache.org/0100/javadoc/org/apache/kafka/common/errors/WakeupException.html
Kafka Version 0.10.0.0
catch (WakeupException e) {
LOG.info("Kafka Consulmer wakeup exception");
// Ignore exception if closing
if (!closed.get()) {
throw e;
}
} finally {
consumer.close();
}
Regards, Rakesh
Upvotes: 8
Views: 18489
Reputation: 1727
You can find few examples on the Confluent document which show how to properly handle the WakeupException [Confluent Consumer Doc]
Basically, if you use consumer.poll(Integer.MAX_VALUE)
the consumer will block until a message is fetched. In this case, if you would like to stop consumption you can call consumer.wakeup()
(from an other thread) and catch the exception to shutdown properly.
Also, while commiting your offsets synchronously a call to consumer.wakeup()
will throw a WakeupException
.
Upvotes: 8