Reputation: 859
Kafka consumer poll 500 messages in each poll()
. We have disabled the enable.auto.commit = false
,
Assume we have processed the 100 messages successfully, offset is also at 100
Now on 101th msg, we got an error we haven't committed the offset
But because we already have 500 messages so we process the next message 102th and we successfully processed it and we also committed the offset for the 102th message.
Que:
Upvotes: 0
Views: 1661
Reputation: 191844
If you didn't break the poll loop over the record batch on the exception, then the record is dropped, and you've implemented "at most once processing".
The data is still available in the broker, though, but you'd need to rewind your consumer (using seek method), and poll offsets 101-601. Then you'd end up with "at least once processing" since you'd be processing offset 101 and 102 multiple times. You'd need to update whatever processing logic to make sure this won't cause unintended side effects or errors.
Outside of crashing the consumer and letting it restart and poll again (and maybe crash again and again), you can create a producer in a try-except that'll push the data to a brand new topic (called a dead letter topic). Then you need to write more consumer processing code to inspect and handle those events.
Upvotes: 2
Reputation: 1856
If you commit that you have successfully processed the message at offset 102 (so you commit offset 103), you will not receive any message prior to that offset ever again unless you manually seek to a previous position.
A common way to overcome this issue is to implement a dead letter queue (DLQ), you can find information about this pattern online. In a nutshell, if the error is not retriable (e.g. invalid data) you send the message to another topic so you can manually troubleshoot what is going on while the main application continues processing newer messages. This approach will require you to set some kind of alarm that notifies you when messages have been sent to the DLQ and have some tooling prepared to inspect these messages and resend them tot the main queue if necessary (e.g. after fixing the bug)
Upvotes: 1