Maclean Pinto
Maclean Pinto

Reputation: 1135

Kafka Consumer WakeupException Handling

I am trying to handle shutdown events in Kafka Consumers. API docs : https://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/common/errors/WakeupException.html

public class KafkaConsumerRunner implements Runnable {
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private final KafkaConsumer consumer;

     public void run() {
         try {
             consumer.subscribe(Arrays.asList("topic"));
             while (!closed.get()) {
                 ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));
                 // Handle new records
             }
         } catch (WakeupException e) {
             // Ignore exception if closing
             if (!closed.get()) throw e;
         } finally {
             consumer.close();
         }
     }

     // Shutdown hook which can be called from a separate thread
     public void shutdown() {
         closed.set(true);
         consumer.wakeup();
     }
 }

Here why does the document expect us the handle if the boolean is false? if (!closed.get()) throw e; If an external thread calls shutdown closed will be set to true always. Is there any use-case I am missing?

Upvotes: 2

Views: 1632

Answers (2)

Adam Kotwasinski
Adam Kotwasinski

Reputation: 4564

Wakeup event may be also caused by consumer.wakeup() that can be e.g. called by some other thread.

In that case, your code is going to throw this exception (instead of ignoring it if it was an ordinary shutdown). The exception could be then captured by the runner (e.g. ExecutionService), and then the userspace could do something with this information (effectively "consumer was woken up by 3rd party").

Upvotes: 2

ChristDist
ChristDist

Reputation: 778

I am summarizing how does the wakeup exception works.

  • If you are deciding to exit the poll loop, you will need another thread to call consumer.wakeup().
  • If you are running the consumer loop in the main thread, this can be done from ShutdownHook.

Note that consumer.wakeup() is the only consumer method that is safe to call from a different thread. Calling wakeup will cause poll() to exit with WakeupException, or if consumer.wakeup() was called while the thread was not waiting on poll, the exception will be thrown on the next iteration when poll() is called.

The WakeupException doesn’t need to be handled, but before exiting the thread, you must call consumer.close().

Closing the consumer will commit off‐ sets if needed and will send the group coordinator a message that the consumer is leaving the group. The consumer coordinator will trigger rebalancing immediately

Full example is here https://github.com/gwenshap/kafka-examples/blob/master/SimpleMovingAvg/src/main/java/com/shapira/examples/newconsumer/simplemovingavg/SimpleMovingAvgNewConsumer.java

Runtime.getRuntime().addShutdownHook(new Thread() {
 public void run() {
 System.out.println("Starting exit...");
 consumer.wakeup(); 
 try {
 mainThread.join();
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
 });
...

try {
 // looping until ctrl-c, the shutdown hook will
 cleanup on exit
 while (true) {
 ConsumerRecords<String, String> records =
 movingAvg.consumer.poll(1000);
 System.out.println(System.currentTimeMillis() + "
 -- waiting for data...");
 for (ConsumerRecord<String, String> record :
 records) {
 System.out.printf("offset = %d, key = %s,
 value = %s\n",
 record.offset(), record.key(),
 record.value());
 }
 for (TopicPartition tp: consumer.assignment())
 System.out.println("Committing offset at
 position:" +
 consumer.position(tp));
 movingAvg.consumer.commitSync();
 }
 } catch (WakeupException e) {
 // ignore for shutdown 
 } finally {
 consumer.close(); 
 System.out.println("Closed consumer and we are done");
 }
 }

Upvotes: 2

Related Questions