Reputation: 910
I have a kafka consumer that consumes messages from a topic and processes them. I have applied a retry attempt of 5 to process the message and write that message to another topic for future reference in case the processing fails even after 5 attempts. My code looks like below:
KafkaConsumerConfig:
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Object, Object>> kafkaManualAckListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(Integer.parseInt(kafkaConcurrency));
factory.setErrorHandler(new SeekToCurrentErrorHandler(null, 5));
return factory;
}
KafkaConsumer:
@Value("${kafka.consumer.failed.topic}")
private String failedTopic;
@KafkaListener(topics = "${kafka.consumer.topic}", groupId = "${kafka.consumer.groupId}", containerFactory = "kafkaManualAckListenerContainerFactory")
public void processMessage(String kafkaMessage) throws Exception {
log.info("parsing new kafka message {}", kafkaMessage);
TransactionDTO transactionDTO = TransformUtil.fromJson(kafkaMessage, TransactionDTO.class);
try {
service.parseTransactionDTO(transactionDTO);
} catch (Exception e) {
log.error(e.getMessage());
kafkaTemplate.send(failedTopic, kafkaMessage);
Thread.sleep(10000);
throw e;
}
}
Consumer correctly tries processing in 5 attempts with a delay of 10 seconds but each time, it fails, a new message is written into the failed topic. Is there a way, the message is written to the failed topic only once when all the retry attempts have been exhausted instead of writing it each time on failure?
Upvotes: 0
Views: 340
Reputation: 174729
factory.setErrorHandler(new SeekToCurrentErrorHandler(null, 5));
Use a DeadLetterPublishingRecoverer
instead of null
in the error handler.
It was introduced in 2.2; I am not sure what version you are using. If older, you can either upgrade (the current version is 2.5.3), or you can move your publishing code into a custom recoverer.
Newer versions (since 2.3) allow adding a back off delay between retry attempts.
Upvotes: 0