Reputation: 1135
I am trying to handle shutdown events in Kafka Consumers. API docs : https://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/common/errors/WakeupException.html
public class KafkaConsumerRunner implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer consumer;
public void run() {
try {
consumer.subscribe(Arrays.asList("topic"));
while (!closed.get()) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));
// Handle new records
}
} catch (WakeupException e) {
// Ignore exception if closing
if (!closed.get()) throw e;
} finally {
consumer.close();
}
}
// Shutdown hook which can be called from a separate thread
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
}
Here why does the document expect us the handle if the boolean is false? if (!closed.get()) throw e;
If an external thread calls shutdown closed will be set to true always. Is there any use-case I am missing?
Upvotes: 2
Views: 1632
Reputation: 4564
Wakeup event may be also caused by consumer.wakeup()
that can be e.g. called by some other thread.
In that case, your code is going to throw this exception (instead of ignoring it if it was an ordinary shutdown). The exception could be then captured by the runner (e.g. ExecutionService), and then the userspace could do something with this information (effectively "consumer was woken up by 3rd party").
Upvotes: 2
Reputation: 778
I am summarizing how does the wakeup exception works.
Note that consumer.wakeup() is the only consumer method that is safe to call from a different thread. Calling wakeup will cause poll() to exit with WakeupException, or if consumer.wakeup() was called while the thread was not waiting on poll, the exception will be thrown on the next iteration when poll() is called.
The WakeupException doesn’t need to be handled, but before exiting the thread, you must call consumer.close().
Closing the consumer will commit off‐ sets if needed and will send the group coordinator a message that the consumer is leaving the group. The consumer coordinator will trigger rebalancing immediately
Full example is here https://github.com/gwenshap/kafka-examples/blob/master/SimpleMovingAvg/src/main/java/com/shapira/examples/newconsumer/simplemovingavg/SimpleMovingAvgNewConsumer.java
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
System.out.println("Starting exit...");
consumer.wakeup();
try {
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
...
try {
// looping until ctrl-c, the shutdown hook will
cleanup on exit
while (true) {
ConsumerRecords<String, String> records =
movingAvg.consumer.poll(1000);
System.out.println(System.currentTimeMillis() + "
-- waiting for data...");
for (ConsumerRecord<String, String> record :
records) {
System.out.printf("offset = %d, key = %s,
value = %s\n",
record.offset(), record.key(),
record.value());
}
for (TopicPartition tp: consumer.assignment())
System.out.println("Committing offset at
position:" +
consumer.position(tp));
movingAvg.consumer.commitSync();
}
} catch (WakeupException e) {
// ignore for shutdown
} finally {
consumer.close();
System.out.println("Closed consumer and we are done");
}
}
Upvotes: 2