Rakesh Kachhadiya
Rakesh Kachhadiya

Reputation: 81

Kafka Consumer WakeupException Handling Java

if I run the kafka consumer inside a thread without manipulating it from outside, like put the consumer on sleep or wake him up, it is necessary to handle the WakeupException properly? And what is a good approach to handle it?

The consumer is running on an webservice for pulling constantly data out of a queue and should never stop doing it. Furthermore the service has no idle or suspend state. In the documentation of Kafka it is pointed out that the exception is only thrown when the kafka consumer is blocked by another thread, but that will never happen. https://kafka.apache.org/0100/javadoc/org/apache/kafka/common/errors/WakeupException.html

Kafka Version 0.10.0.0

catch (WakeupException e) {
    LOG.info("Kafka Consulmer wakeup exception");
    // Ignore exception if closing
    if (!closed.get()) {
        throw e;
    }
} finally {
    consumer.close();
}

Regards, Rakesh

Upvotes: 8

Views: 18489

Answers (1)

fhussonnois
fhussonnois

Reputation: 1727

You can find few examples on the Confluent document which show how to properly handle the WakeupException [Confluent Consumer Doc]

Basically, if you use consumer.poll(Integer.MAX_VALUE) the consumer will block until a message is fetched. In this case, if you would like to stop consumption you can call consumer.wakeup() (from an other thread) and catch the exception to shutdown properly.

Also, while commiting your offsets synchronously a call to consumer.wakeup() will throw a WakeupException .

Upvotes: 8

Related Questions