LuckyProgrammer
LuckyProgrammer

Reputation: 47

Spring cloud Kafka does infinite retry when it fails

Currently, I am having an issue where one of the consumer functions throws an error which makes Kafka retry the records again and again.

@Bean
public Consumer<List<RuleEngineSubject>> processCohort() {
    return personDtoList -> {
        
        for(RuleEngineSubject subject : personDtoList)
            processSubject(subject);

    };
}

This is the consumer the processSubject throws a custom error which causes it to fail.

processCohort-in-0:
  destination: internal-process-cohort
  consumer:
    max-attempts: 1
    batch-mode: true
    concurrency: 10
  group: process-cohort-group

The above is my binder for Kafka.

Currently, I am attempting to retry 2 times and then send to a dead letter queue but I have been unsuccessful and not sure which is the right approach to take.

I have tried to implement a custom handler that will handle the error when it fails but does not retry again and I am not sure how to send to a dead letter queue

   @Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {

    return (container, dest, group) -> {
        if (group.equals("process-cohort-group")) {
            container.setBatchErrorHandler(new BatchErrorHandler() {
                @Override
                public void handle(Exception thrownException, ConsumerRecords<?, ?> data) {
                    System.out.println(data.records(dest).iterator().);
                    data.records(dest).forEach(r -> {
                        System.out.println(r.value());
                    });
                    System.out.println("failed payload='{}'" + thrownException.getLocalizedMessage());
                }
            });
        }
      
    };

}

This stops infinite retry but does not send a dead letter queue. Can I get suggestions on how to retry two times and then send a dead letter queue. From my understanding batch listener does not how to recover when there is an error, could someone help shine light on this

Upvotes: 0

Views: 1769

Answers (2)

yakup yal&#231;ın
yakup yal&#231;ın

Reputation: 84

Retry 15 times then throw it to topicname.DLT topic

@Bean
      public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setCommonErrorHandler(
            new DefaultErrorHandler(
                new DeadLetterPublishingRecoverer(kafkaTemplate()), kafkaBackOffPolicy()));
        factory.setConsumerFactory(kafkaConsumerFactory());
        return factory;
      }
    
@Bean
      public ExponentialBackOffWithMaxRetries kafkaBackOffPolicy() {
        var exponentialBackOff = new ExponentialBackOffWithMaxRetries(15);
        exponentialBackOff.setInitialInterval(Duration.ofMillis(500).toMillis());
        exponentialBackOff.setMultiplier(2);
        exponentialBackOff.setMaxInterval(Duration.ofSeconds(2).toMillis());
        return exponentialBackOff;
      }

Upvotes: 1

Gary Russell
Gary Russell

Reputation: 174554

You need to configure a suitable error handler in the listener container; you can disable retry and dlq in the binding and use a DeadLetterPublishingRecoverer instead. See the answer Retry max 3 times when consuming batches in Spring Cloud Stream Kafka Binder

Upvotes: 0

Related Questions