mawek
mawek

Reputation: 709

Commiting offset after consumer error handling in spring-kafka

I don't fully understand how consumer error handling works with committing offset and akcMode and how it is affected by stopping containers on error (using spring-kafka 1.3.*).

Lets say I have two consumers (consuming two partitions), they both fetch 5 events from their partitions when polling (max.records.per.poll=5).

The first consumer - 1st event is processed OK, processing 2nd event fails - so in error handler I call kafkaListenerEndpointRegistry.stop(), but since stop is implemented that it just stop consumers from polling, both consumers still finish processing their current batches. So the first consumer processes events 3,4,5 (all of them processed without errors), and lets say that second consumer fails on 4th event (and events 1,2,3,5 were processed OK). My question is what offsets will be commited for each consumer?

My understanding is:

Is my understanding correct?

Upvotes: 3

Views: 5814

Answers (1)

Gary Russell
Gary Russell

Reputation: 174739

Your understanding is correct; when you stop the container you should also signal to the listener that it needs to reject any remaining records as well so their offsets won't be committed.

We are considering adding a stopNow() method, which will prevent additional records to be sent to the listener.

In 2.0, we added the RemainingRecordsErrorHandler (and an implementation, SeekToCurrentErrorHandler). When the container detects such an error handler, it presents the remaining records to the error handler instead of the listener.

The SeekToCurrentErrorHandler seeks all the topic/partitions to the unprocessed offset (including the failed record) so they are all retrieved on the next poll.

A custom implementation might seek the remaining records, but send the failed record to a dead-letter topic (or otherwise dispose of it).

That said, stopNow() will probably be easier for most folks to deal with, but it will likely be a 2.2 feature only; 1.3.x users need to discard/reject unprocessed records after a failure.

You could also use a RetryingMessageListenerAdapter (or enabling retry if using @KafkaListener) which will retry the delivery according to its retry configuration, without involving Kafka at all. The failed record can be disposed of via a RecoveryCallback after retries have been exhausted and its offset committed then; the container doesn't need to be stopped in that case.

Upvotes: 6

Related Questions