Reputation: 823
What I've to do is pause the KafkaConsumer
if during message consuming an error is thrown.
This is what I wrote
@KafkaListener(...)
public void consume(
@Header(KafkaHeaders.CONSUMER) KafkaConsumer<String,String> consumer,
@Payload String message) {
try {
//consumer message
} catch(Exception e) {
saveConsumer(consumer);
consumer.pause();
}
}
Then I wrote a REST service in order to resume the consumer
@RestController
@RequestMapping("/consumer")
class ConsumerRestController {
@PostMapping("/resume")
public void resume() {
KafkaConsumer<String,String> consumer = getConsumer();
if(consumer != null) {
consumer.resume(consumer.paused());
}
}
}
Now, I've two questions.
First question: When I call consumer.pause() from @KafkaListener
annotated method what happens?
Consumer is immediately paused or I can receive other messages associated on other offset of same topic-partition.
For example, I have "message1" with offset 3 and "message2" with offset 4, "message1" cause an exception, what happens to "message2"? Is it consumed anyway?
Second question: Resuming the consumer from REST service give a ConcurrentModificationException
because KafkaConsumer
is not thread safe. So, how come I have to do this?
Upvotes: 2
Views: 5195
Reputation: 174554
Do not pause the consumer directly; pause the container instead.
@KafkaListener(id = "foo", ...)
@Autowired KafkaListenerEndpointRegistry;
...
registry.getListenerContainer("foo").pause();
The pause will take effect before the next poll; if you want to immediately pause (and not process the remaining records from the last poll), throw an exeption after pausing (assuming you are using the, now default, SeekToCurrentErrorHandler
.
Upvotes: 3