shivam fialok
shivam fialok

Reputation: 151

Spring Cloud Stream Kafka Binder - Retries not working when using DLQ in Batch Mode

I am using Spring Cloud Version 2023.0.1 (spring cloud stream version 4.1.1) and I have written a simple kafka consumer in batch mode to simulate an error scenario.

    @Bean
    Consumer<Message<List<String>>> consumer1() {
        return message -> {
            final List<String> payload = message.getPayload();
            final MessageHeaders messageHeaders = message.getHeaders();
            payload.forEach(System.out::println);
            payload.forEach(p -> {
                if(p.startsWith("a")) {
                    throw new RuntimeException("Intentional Exception");
                }
            });
            System.out.println(messageHeaders);
            System.out.println("Done");
        };
    }

My application.yml file looks like this

spring:
  cloud:
    function:
      definition: consumer1;
    stream:
      bindings:
        consumer1-in-0:
          destination: topic1
          group: consumer1-in-0-v0.1
          consumer:
            batch-mode: true
            use-native-decoding: true
            max-attempts: 3
      kafka:
        binder:
          brokers:
            - localhost:9092
        default:
          consumer:
            configuration:
              max.poll.records: 1000
              max.partition.fetch.bytes: 31457280
              fetch.max.wait.ms: 200
        bindings:
          consumer1-in-0:
            consumer:
              enableDlq: true
              dlqName: dlq-topic
              dlqProducerProperties:
                configuration:
                  value.serializer: org.apache.kafka.common.serialization.StringSerializer
                  key.serializer: org.apache.kafka.common.serialization.StringSerializer
              configuration:
                key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
                value.deserializer: org.apache.kafka.common.serialization.StringDeserializer

I have also specified a ListenerContainerWithDlqAndRetryCustomizer to customize the retries

    @Bean
    ListenerContainerWithDlqAndRetryCustomizer cust(KafkaTemplate<?, ?> template) {
        return new ListenerContainerWithDlqAndRetryCustomizer() {

            @Override
            public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName,
                                  String group,
                                  @Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
                                  @Nullable BackOff backOff) {

                ConsumerRecordRecoverer dlpr = new DeadLetterPublishingRecoverer(template,
                        dlqDestinationResolver);
                container.setCommonErrorHandler(new DefaultErrorHandler(dlpr, backOff));
            }

            @Override
            public boolean retryAndDlqInBinding(String destinationName, String group) {
                return false;
            }

        };
    }

The issue When an error happens, the message batch goes straight to the DLQ. And no retries are attempted.

However the problem is that there can be transient errors because of which the batch failed processing, and I want the batch to be retried a few times before sending it to DLQ. But I am not able to get it to work.

What am I doing wrong?

Upvotes: 0

Views: 217

Answers (2)

sobychacko
sobychacko

Reputation: 5924

The original question exposed a bug in the binder, which I believe was causing the issue you saw before. See this issue for details: https://github.com/spring-cloud/spring-cloud-stream/issues/2951.

If you try the binder version 4.1.2-SNAPSHOT, the above code works for me:

@SpringBootApplication
public class So78485425Application {

    public static void main(String[] args) {
        SpringApplication.run(So78485425Application.class, args);
    }

    @Bean
    Consumer<Message<List<String>>> consumer1() {
        return message -> {
            final List<String> payload = message.getPayload();
            final MessageHeaders messageHeaders = message.getHeaders();
            payload.forEach(System.out::println);
            payload.forEach(p -> {
                if(p.startsWith("a")) {
                    throw new RuntimeException("Intentional Exception");
                }
            });
            System.out.println(messageHeaders);
            System.out.println("Done");
        };
    }

    @Bean
    public KafkaTemplate<String, String> stringStringKafkaTemplate(ProducerFactory<String, String> pf) {
        return new KafkaTemplate<>(pf, Map.of(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class));
    }

    @Bean
    ListenerContainerWithDlqAndRetryCustomizer cust(KafkaTemplate<?, ?> stringStringKafkaTemplate) {
        return new ListenerContainerWithDlqAndRetryCustomizer() {

            @Override
            public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName,
                                  String group,
                                  BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
                                  BackOff backOff) {
                ConsumerRecordRecoverer dlpr = new DeadLetterPublishingRecoverer(stringStringKafkaTemplate,
                        dlqDestinationResolver);
                container.setCommonErrorHandler(new DefaultErrorHandler(dlpr, backOff));
            }

            @Override
            public boolean retryAndDlqInBinding(String destinationName, String group) {
                return false;
            }

        };
    }

}

and the corresponding configuration:

spring:
  cloud:
    function:
      definition: consumer1
    stream:
      bindings:
        consumer1-in-0:
          destination: topic1
          group: consumer1-in-0-v0.1
          consumer:
            batch-mode: true
            use-native-decoding: true
            max-attempts: 3
      kafka:
        binder:
          brokers:
            - localhost:9092
        default:
          consumer:
            configuration:
              max.poll.records: 1000
              max.partition.fetch.bytes: 31457280
              fetch.max.wait.ms: 200
        bindings:
          consumer1-in-0:
            consumer:
              enableDlq: true
              dlqName: dlq-topic
              configuration:
                key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
                value.deserializer: org.apache.kafka.common.serialization.StringDeserializer

Upvotes: 1

shivam fialok
shivam fialok

Reputation: 151

In case anyone stumbles upon this in future, i figured out what was wrong. I had to remove enableDlq, dlqName and dlqProducerProperties from application.yml file.

Then it worked.

In the java code, I also removed ListenerContainerWithDlqAndRetryCustomizer and just used ListenerContainerCustomizer. The code looked something like this:

    @Bean
    public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(DefaultErrorHandler errorHandler) {
        return (container, dest, group) -> container.setCommonErrorHandler(errorHandler);
    }

    @Bean
    public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
        return new DefaultErrorHandler(deadLetterPublishingRecoverer, new FixedBackOff(0, 4));
    }

    @Bean
    public DeadLetterPublishingRecoverer publisher(KafkaOperations<?, ?> stringTemplate,
                                                   KafkaOperations<?, ?> bytesTemplate,
                                                   KafkaOperations<?, ?> longTemplate) {
        Map<Class<?>, KafkaOperations<?, ?>> templates = new LinkedHashMap<>();
        templates.put(String.class, stringTemplate);
        templates.put(byte[].class, bytesTemplate);
        templates.put(Long.class, longTemplate);
        return new DeadLetterPublishingRecoverer(templates);
    }

    @Bean
    public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
        return new KafkaTemplate<>(pf,
                Map.of(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class));
    }

    @Bean
    public KafkaTemplate<String, String> bytesTemplate(ProducerFactory<String, String> pf) {
        return new KafkaTemplate<>(pf);
    }

    @Bean
    public KafkaTemplate<String, Long> longTemplate(ProducerFactory<String, Long> pf) {
        return new KafkaTemplate<>(pf,
                Map.of(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class));
    }

Upvotes: 0

Related Questions