Guillaume
Guillaume

Reputation: 494

Kafka Consumer Error Handling : confusion ErrorHandling / RetryTemplate

I try to have this following error handling when I consume my message :

What I have (Spring kafka 2.5.5.RELEASE with 2.5.1 Kafka Client) is following :

@Bean(name = "kafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<String, MyObject> kafkaListenerContainerFactory() throws ExceptionTechnique {
        ConcurrentKafkaListenerContainerFactory<String, MyObject> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(RECORD);
        factory.setRetryTemplate(retryTemplate());
        factory.setRecoveryCallback(context -> {
           Object record = context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_RECORD);
           LOGGER_TECHNIQUE.error("Fail to handle message after {} retries. {}",
                context.getRetryCount(), record);
           return record;
        });
        factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(kafkaOperations,
                        (cr, e) -> new TopicPartition("myTopicToRead.dlq", cr.partition())),
                        new FixedBackOff(10000L, 2L)));
        factory.setConcurrency(kafkaListenerConcurrency);
        factory.setStatefulRetry(true);

        return factory;
    }

@Bean
public RetryTemplate retryTemplate() {
    RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setBackOffPolicy(backOffPolicy());
    retryTemplate.setRetryPolicy(retryPolicy());
    return retryTemplate;
}

private BackOffPolicy backOffPolicy() {
    FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
    fixedBackOffPolicy.setBackOffPeriod(10000);
    return fixedBackOffPolicy;
}

private SimpleRetryPolicy retryPolicy() {
    Map<Class<? extends Throwable>, Boolean> exceptionMap = new HashMap<>();

    exceptionMap.put(IllegalArgumentException.class, false);
    exceptionMap.put(TimeoutException.class, false);
    exceptionMap.put(ListenerExecutionFailedException.class, true);
    // Custom exception
    exceptionMap.put(MyTechnicalException.class, false);
    exceptionMap.put(MyFunctionalException.class, false);

    return new SimpleRetryPolicy(3, exceptionMap, true);
}

Now, if I send not serializable message, my message goes to DLT without retry -> OK !

In my @KafkaHandler, I have a throw new MyTechnicalException, catched and rethrown.

I should have no retry, but I got 2 retrys, with 20sec each (instead of 10 ?), and a message sent to DLT after 2 retries.

If I remove the errorHandler, no surprise, 3 attempts and my log error message is displayed... but I need to send to DLQ...

If I remove the RetryTemplate and RecoveryCallback, no surprise, but all exception are retried...

Questions :

  1. Is there a way to handle my use case ? How ?
  2. What's the difference between RecoveryCallback and ErrorHandler in the factory configuration ?
  3. How does the retry depending Exception type work ? Does it works with our Exception ?

Upvotes: 0

Views: 3299

Answers (1)

Gary Russell
Gary Russell

Reputation: 174554

Retries at the listener level have been deprecated since the error handlers have evolved to cover all the functionality that the RetryTemplate provides - back off, exception classification, etc.

https://github.com/spring-projects/spring-kafka/issues/1886

Upvotes: 1

Related Questions