Boris Mitioglov
Boris Mitioglov

Reputation: 1182

Proper error handling for kafka batch listener

I am working on error handling implementation and have a question.
Let me explain the problem:
I receive a batch of messages, I do the DB lookup of each one in the for loop, then I need to collect all the looked up objects in the list and call bulk insert stored proc and bulk update stored proc with this list of objects.

Now let's assume there is some exception happened during lookup. I want to retry this message. For this case I tried to use DefaultErrorHandler. But there is a problem, according to the doc, when BatchListenerFailedException with element index is thrown it commits the offsets of the records before the index. But as I said I need to execute bulk insert and update after lookup so I don't want to commit offsets before the index, these records weren't inserted/updated to the DB yet.

Does it mean that my only option is to use RetryingBatchErrorHandler that retries the whole batch every time? Can I somehow continue processing messages that don't produce an error?

Also if RetryingBatchErrorHandler is the only option, how could I be sure that in case of long backoff period (exponential backoff), kafka won't kill my consumer and won't initiate rebalance?

My current implementation:

RetryingBatchErrorHandler retryingBatchErrorHandler =
                new RetryingBatchErrorHandler(backoff,
                        (consumerRecord, e) ->
                                log.error("Backoff attempts exhausted for the record with offset={}, partition={}, value={}, offset committed.",
                consumerRecord.offset(), consumerRecord.partition(), consumerRecord.value()));

factory.setBatchErrorHandler(retryingBatchErrorHandler);

UPDATE: see comments in the Artem's answer.
This is how lookup step can be wrapped into retryTemplate

LookedUpRequest lookedUpRequest = retryTemplate.execute(ctx -> {
   //Lookup step
   return lookup.process(request);
});

and if it fails then it will throw the exception further for batch error handler where RetryingBatchErrorHandler retry the batch according to its policy

Upvotes: 0

Views: 1693

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121552

See its JavaDocs:

/**
 * A batch error handler that invokes the listener according to the supplied
 * {@link BackOff}. The consumer is paused/polled/resumed before each retry in order to
 * avoid a rebalance. If/when retries are exhausted, the provided
 * {@link ConsumerRecordRecoverer} is invoked for each record in the batch. If the
 * recoverer throws an exception, or the thread is interrupted while sleeping, seeks are
 * performed so that the batch will be redelivered on the next poll.
 *
 * @author Gary Russell
 * @since 2.3.7
 *
 */
public class RetryingBatchErrorHandler extends KafkaExceptionLogLevelAware
        implements ListenerInvokingBatchErrorHandler {

So, no rebalance because consumer is paused in between.

You might need to think about something caching for your lookup part, so you don't stress DB with those records which you have requested already before the failure.

You also may think about doing a retry around that lookup yourself. See RetryTemplate. But in this case you need to be sure that the whole operation is not long enough (see max.poll.interval.ms) to make a consumer to leave its group.

Upvotes: 3

Related Questions