whywake
whywake

Reputation: 910

Spring Kafka produce message after consumer retry completed

I have a kafka consumer that consumes messages from a topic and processes them. I have applied a retry attempt of 5 to process the message and write that message to another topic for future reference in case the processing fails even after 5 attempts. My code looks like below:

KafkaConsumerConfig:

   @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Object, Object>> kafkaManualAckListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(Integer.parseInt(kafkaConcurrency));
        factory.setErrorHandler(new SeekToCurrentErrorHandler(null, 5));
        return factory;
    }

KafkaConsumer:

    @Value("${kafka.consumer.failed.topic}")
    private String failedTopic;
    
    @KafkaListener(topics = "${kafka.consumer.topic}", groupId = "${kafka.consumer.groupId}", containerFactory = "kafkaManualAckListenerContainerFactory")
    public void processMessage(String kafkaMessage) throws Exception {
        log.info("parsing new kafka message {}", kafkaMessage);
        TransactionDTO transactionDTO = TransformUtil.fromJson(kafkaMessage, TransactionDTO.class);
        try {
            service.parseTransactionDTO(transactionDTO);
        } catch (Exception e) {
            log.error(e.getMessage());
            kafkaTemplate.send(failedTopic, kafkaMessage);
            Thread.sleep(10000);
            throw e;
        }
    }

Consumer correctly tries processing in 5 attempts with a delay of 10 seconds but each time, it fails, a new message is written into the failed topic. Is there a way, the message is written to the failed topic only once when all the retry attempts have been exhausted instead of writing it each time on failure?

Upvotes: 0

Views: 340

Answers (1)

Gary Russell
Gary Russell

Reputation: 174729

factory.setErrorHandler(new SeekToCurrentErrorHandler(null, 5));

Use a DeadLetterPublishingRecoverer instead of null in the error handler.

It was introduced in 2.2; I am not sure what version you are using. If older, you can either upgrade (the current version is 2.5.3), or you can move your publishing code into a custom recoverer.

Newer versions (since 2.3) allow adding a back off delay between retry attempts.

Upvotes: 0

Related Questions