Vin
Vin

Reputation: 823

Pause and Resume KafkaConsumer

What I've to do is pause the KafkaConsumer if during message consuming an error is thrown.

This is what I wrote

@KafkaListener(...)
public void consume(
 @Header(KafkaHeaders.CONSUMER) KafkaConsumer<String,String> consumer,
 @Payload String message) {
  
    try {
        //consumer message
    } catch(Exception e) {
        saveConsumer(consumer);
        consumer.pause();
    }
}

Then I wrote a REST service in order to resume the consumer

@RestController
@RequestMapping("/consumer")
class ConsumerRestController {
    @PostMapping("/resume")
    public void resume() {
        KafkaConsumer<String,String> consumer = getConsumer();
        if(consumer != null) {
            consumer.resume(consumer.paused());
        }
    }
}

Now, I've two questions. First question: When I call consumer.pause() from @KafkaListener annotated method what happens? Consumer is immediately paused or I can receive other messages associated on other offset of same topic-partition. For example, I have "message1" with offset 3 and "message2" with offset 4, "message1" cause an exception, what happens to "message2"? Is it consumed anyway?

Second question: Resuming the consumer from REST service give a ConcurrentModificationException because KafkaConsumer is not thread safe. So, how come I have to do this?

Upvotes: 2

Views: 5195

Answers (1)

Gary Russell
Gary Russell

Reputation: 174554

Do not pause the consumer directly; pause the container instead.

@KafkaListener(id = "foo", ...)
@Autowired KafkaListenerEndpointRegistry;

...

registry.getListenerContainer("foo").pause();

The pause will take effect before the next poll; if you want to immediately pause (and not process the remaining records from the last poll), throw an exeption after pausing (assuming you are using the, now default, SeekToCurrentErrorHandler.

Upvotes: 3

Related Questions