Reputation: 1639
Let's say we are using Kafka with manual commits. We are processing the incoming message. But if there is a failure in processing for any reason, we want to reread the message again and we want to ensure that as long as it is not comitted, it will be reprocessed.
while True:
try:
msg = consumer.poll()
out_msg = process(msg)
consumer.commitAsync()
except:
print('error occurred. not comitting')
If I have the topic with the following contents 1 2 3 4 5 6 7 and failure starts happening from offset 4 onwards, then how do I keep reprorcessing 4 5 6 7? I don't have to get stuck on 4 because it is the first occurence of failure but that is fine as well.
If I have this basic consume loop, I want to reprocess the same message multiple times until I can call commit on it. Whether I use commitAsync() or commitSync() does not matter for me, as long I will eventually commit every message. What Kafka setup is required to achieve this?
Upvotes: 1
Views: 1494
Reputation: 504
Failure to commit multiples times will stuck your consumer at same position. But if you don't want to just stuck at '4' as per above reference, but process remaining (5,6,7). This can be achieve if you commit/async only once after processing all message polled at one instant.
In above example, fail at offset '4' will re-process all messages polled at that time , message process will be 1,2,3,4,5,6,7.
consumer.poll()
can returns more than 1 messages. This is controlled by max.poll.records
, let's assume you have configured it to 10;
In above example, code will be change bit
msgs = consumer.poll();
//inner loop to process msgs
out_msg = process(msg);
//end of inner loop
// outer loop consumer.commitAsync();
There is no single solution, this needs to be handle in your code than in kafka broker. Possible approach:
Identify the root cause of repeated failures for any message, and solve it. For example: Out Of Memory issue, Increase the memory, otherwise it will fail continuously.
Create table - ex: FAIL_MESSAGE, column(offset, retry_count) Idea is to keep count and based on number of attempt, you can move it to FAIL_MESSAGE_LOG and stop re-processing.
Dead letter queue - This is similar to option:2, Probably you want to keep count the number of attempts, otherwise this message will be retry continuously.
Suggestively, Option:1 is preferred approach.
Upvotes: 1
Reputation: 191710
You'll need to break the loop to stop polling on an exception, otherwise the next batch of offsets will be polled and the next successful commit will skip over the offsets you wanted to read. (Maybe this is what you want?)
After you break the loop, you can kill the script, then edit your code to fix whatever exception you get, and redeploy the consumer. Since the commit didn't happen, it'll start polling from the failed position.
Your other option would be to use a dead letter queue (write failed events to a new topic to process later). Then you need multiple consumer apps running to process each topic (or subscribe to both, and write if statements against the topic name)
The last option to get yourself in an infinite loop would be to seek
the consumer back to the failed offset and retry the process function, then poll again
Upvotes: 1