Reputation: 2871
We are reading messages from a Kafka-topic. I was under the (false) impression that by setting the EnableAutoOffsetStore/enable.auto.offset.store = false you, as a consumer could choose when you wanted to move the offset.
We were using this like so
while (!cancellationToken.IsCancellationRequested)
{
try{
var consumeResult = kafkaConsumer.Consume(cancellationToken);
// process consumeResult.Message
kafkaConsumer.StoreOffset(consumeResult);
}
catch
{
// delay and re-try
}
}
In this way we could re-read the message if we got an exception. We have 1 producer, 1 consumer,1 topic, 1 partition (and 1 group)
How should this behavior be achieved?
Upvotes: 2
Views: 1957
Reputation: 191854
If you catch an exception in process
, then the commit will never happen.
You can store the partition+offset for the record that failed and Seek
back to that offset.
Alternatively, you can skip over the offset in your main processing loop and write that record to a dead-letter topic and write a different consumer to "process again/differently"
Upvotes: 3