Reputation: 20598
We are trying to configure retryable exceptions together with Kafka transactions and dead letter queues, on Spring Cloud Stream. That is, we want to specify which exceptions should be retried.
It seems however that, as soon as you enable the transactions (by setting spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix
), default-retryable
and retryable-exceptions
are ignored. Should they be configured in a different way?
Here is a minimal example to reproduce the problem. Just define a @SpringBootApplication
with a consumer that always throws IllegalArgumentException
:
@Configuration
@Slf4j
public class ConsumerConfig {
@Bean
public Consumer<String> consumeMessage() {
return s -> {
log.info("Consuming {}", s);
throw new IllegalArgumentException(s);
};
}
}
and application.yml
:
spring:
cloud:
function:
definition: consumeMessage
stream:
kafka:
binder:
transaction:
transaction-id-prefix: transaction-
required-acks: all
configuration:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
bindings:
consumeMessage-in-0:
consumer:
enable-dlq: true
bindings:
consumeMessage-in-0:
group: my-group
destination: my-topic
consumer:
default-retryable: false
max-attempts: 5
back-off-initial-interval: 100
retryable-exceptions:
java.lang.UnsupportedOperationException: true
java.lang.IllegalArgumentException: false
This is with Spring Boot 3.4.0 and Spring Cloud 2024.0.0 and the following dependencies:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
I have also pushed a GitHub project with the same code.
When I submit a message on my-topic
, it will retry it 5 times despite the default-retryable: false
and java.lang.IllegalArgumentException: false
. If I unset transaction-id-prefix
, it works as intended.
Diving into the Spring Cloud Stream code, I found that KafkaMessageChannelBinder
will set a RetryTemplate
configured by buildRetryTemplate(properties)
if there is no TransactionManager
, but if there is one, it will configure an AfterRollbackProcessor
instead, passing it only a BackOff
without using the retryable exceptions configuration.
Upvotes: 2
Views: 62