Reputation: 494
I try to have this following error handling when I consume my message :
What I have (Spring kafka 2.5.5.RELEASE with 2.5.1 Kafka Client) is following :
@Bean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, MyObject> kafkaListenerContainerFactory() throws ExceptionTechnique {
ConcurrentKafkaListenerContainerFactory<String, MyObject> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(RECORD);
factory.setRetryTemplate(retryTemplate());
factory.setRecoveryCallback(context -> {
Object record = context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_RECORD);
LOGGER_TECHNIQUE.error("Fail to handle message after {} retries. {}",
context.getRetryCount(), record);
return record;
});
factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(kafkaOperations,
(cr, e) -> new TopicPartition("myTopicToRead.dlq", cr.partition())),
new FixedBackOff(10000L, 2L)));
factory.setConcurrency(kafkaListenerConcurrency);
factory.setStatefulRetry(true);
return factory;
}
@Bean
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setBackOffPolicy(backOffPolicy());
retryTemplate.setRetryPolicy(retryPolicy());
return retryTemplate;
}
private BackOffPolicy backOffPolicy() {
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
fixedBackOffPolicy.setBackOffPeriod(10000);
return fixedBackOffPolicy;
}
private SimpleRetryPolicy retryPolicy() {
Map<Class<? extends Throwable>, Boolean> exceptionMap = new HashMap<>();
exceptionMap.put(IllegalArgumentException.class, false);
exceptionMap.put(TimeoutException.class, false);
exceptionMap.put(ListenerExecutionFailedException.class, true);
// Custom exception
exceptionMap.put(MyTechnicalException.class, false);
exceptionMap.put(MyFunctionalException.class, false);
return new SimpleRetryPolicy(3, exceptionMap, true);
}
Now, if I send not serializable message, my message goes to DLT without retry -> OK !
In my @KafkaHandler
, I have a throw new MyTechnicalException
, catched and rethrown.
I should have no retry, but I got 2 retrys, with 20sec each (instead of 10 ?), and a message sent to DLT after 2 retries.
If I remove the errorHandler, no surprise, 3 attempts and my log error message is displayed... but I need to send to DLQ...
If I remove the RetryTemplate and RecoveryCallback, no surprise, but all exception are retried...
Questions :
Upvotes: 0
Views: 3299
Reputation: 174554
Retries at the listener level have been deprecated since the error handlers have evolved to cover all the functionality that the RetryTemplate
provides - back off, exception classification, etc.
https://github.com/spring-projects/spring-kafka/issues/1886
Upvotes: 1