Didier L
Didier L

Reputation: 20598

How to configure retryable exceptions for consumers when Kafka transactions are enabled?

We are trying to configure retryable exceptions together with Kafka transactions and dead letter queues, on Spring Cloud Stream. That is, we want to specify which exceptions should be retried.

It seems however that, as soon as you enable the transactions (by setting spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix), default-retryable and retryable-exceptions are ignored. Should they be configured in a different way?

Here is a minimal example to reproduce the problem. Just define a @SpringBootApplication with a consumer that always throws IllegalArgumentException:

@Configuration
@Slf4j
public class ConsumerConfig {
    @Bean
    public Consumer<String> consumeMessage() {
        return s -> {
            log.info("Consuming {}", s);
            throw new IllegalArgumentException(s);
        };
    }
}

and application.yml:

spring:
  cloud:
    function:
      definition: consumeMessage
    stream:
      kafka:
        binder:
          transaction:
            transaction-id-prefix: transaction-
          required-acks: all
          configuration:
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
            key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
        bindings:
          consumeMessage-in-0:
            consumer:
              enable-dlq: true
      bindings:
        consumeMessage-in-0:
          group: my-group
          destination: my-topic
          consumer:
            default-retryable: false
            max-attempts: 5
            back-off-initial-interval: 100
            retryable-exceptions:
              java.lang.UnsupportedOperationException: true
              java.lang.IllegalArgumentException: false

This is with Spring Boot 3.4.0 and Spring Cloud 2024.0.0 and the following dependencies:

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <scope>provided</scope>
        </dependency>

I have also pushed a GitHub project with the same code.

When I submit a message on my-topic, it will retry it 5 times despite the default-retryable: false and java.lang.IllegalArgumentException: false. If I unset transaction-id-prefix, it works as intended.

Diving into the Spring Cloud Stream code, I found that KafkaMessageChannelBinder will set a RetryTemplate configured by buildRetryTemplate(properties) if there is no TransactionManager, but if there is one, it will configure an AfterRollbackProcessor instead, passing it only a BackOff without using the retryable exceptions configuration.

Upvotes: 2

Views: 62

Answers (0)

Related Questions