Reputation: 1182
I am working on error handling implementation and have a question.
Let me explain the problem:
I receive a batch of messages, I do the DB lookup of each one in the for loop, then I need to collect all the looked up objects in the list and call bulk insert stored proc and bulk update stored proc with this list of objects.
Now let's assume there is some exception happened during lookup. I want to retry this message. For this case I tried to use DefaultErrorHandler
. But there is a problem, according to the doc, when BatchListenerFailedException with element index is thrown it commits the offsets of the records before the index. But as I said I need to execute bulk insert and update after lookup so I don't want to commit offsets before the index, these records weren't inserted/updated to the DB yet.
Does it mean that my only option is to use RetryingBatchErrorHandler
that retries the whole batch every time? Can I somehow continue processing messages that don't produce an error?
Also if RetryingBatchErrorHandler
is the only option, how could I be sure that in case of long backoff period (exponential backoff), kafka won't kill my consumer and won't initiate rebalance?
My current implementation:
RetryingBatchErrorHandler retryingBatchErrorHandler =
new RetryingBatchErrorHandler(backoff,
(consumerRecord, e) ->
log.error("Backoff attempts exhausted for the record with offset={}, partition={}, value={}, offset committed.",
consumerRecord.offset(), consumerRecord.partition(), consumerRecord.value()));
factory.setBatchErrorHandler(retryingBatchErrorHandler);
UPDATE: see comments in the Artem's answer.
This is how lookup step can be wrapped into retryTemplate
LookedUpRequest lookedUpRequest = retryTemplate.execute(ctx -> {
//Lookup step
return lookup.process(request);
});
and if it fails then it will throw the exception further for batch error handler where RetryingBatchErrorHandler
retry the batch according to its policy
Upvotes: 0
Views: 1693
Reputation: 121552
See its JavaDocs:
/**
* A batch error handler that invokes the listener according to the supplied
* {@link BackOff}. The consumer is paused/polled/resumed before each retry in order to
* avoid a rebalance. If/when retries are exhausted, the provided
* {@link ConsumerRecordRecoverer} is invoked for each record in the batch. If the
* recoverer throws an exception, or the thread is interrupted while sleeping, seeks are
* performed so that the batch will be redelivered on the next poll.
*
* @author Gary Russell
* @since 2.3.7
*
*/
public class RetryingBatchErrorHandler extends KafkaExceptionLogLevelAware
implements ListenerInvokingBatchErrorHandler {
So, no rebalance because consumer is paused in between.
You might need to think about something caching for your lookup part, so you don't stress DB with those records which you have requested already before the failure.
You also may think about doing a retry around that lookup yourself. See RetryTemplate
. But in this case you need to be sure that the whole operation is not long enough (see max.poll.interval.ms
) to make a consumer to leave its group.
Upvotes: 3