Reputation: 51
I have a @StreamListener method where it will perform REST call. When REST call return exception, @StreamListener method will throw RunTimeException and perform retry. @StreamListener method will retry unlimited times when it throw RuntimeException
Spring Cloud Stream Retry configuration:
spring.cloud.stream.kafka.bindings.inputChannel.consumer.enableDlq=true
spring.cloud.stream.bindings.inputChannel.consumer.maxAttempts=3
spring.cloud.stream.bindings.inputChannel.consumer.concurrency=3
spring.cloud.stream.bindings.inputChannel.consumer.backOffInitialInterval=300000
spring.cloud.stream.bindings.inputChannel.consumer.backOffMaxInterval=600000
SpringBoot microservice dependencies version:
Spring Boot 2.0.3
Spring Cloud Stream Elmhurst.RELEASE
Kafka broker 1.1.0
Upvotes: 2
Views: 3189
Reputation: 370
Using RetryTemplate or increasing maxAttempts property has the restriction that retries should be completed within max.poll.interval.ms
, otherwise Kafka broker will think that consumer is down and reassigns the partition to another consumer(if available).
Other option is to make the listener re-read the same message from Kafka using consumer.seek
method.
@StreamListener("events")
public void handleEvent(@Payload String eventString, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) String partitionId,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.OFFSET) String offset) {
try {
//do the logic (example: REST call)
} catch (Exception e) { // Catch only specific exceptions that can be retried
consumer.seek(new TopicPartition(topic, Integer.parseInt(partitionId)), Long.parseLong(offset));
}
}
Upvotes: 2
Reputation: 51
after a few trial and error, we found out that kafka configuration: max.poll.interval.ms is defaulted to 5 minutes. Due to our consumer retry mechanism, our whole retry process will take 15 minutes for the worst case scenario.
So after 5 minutes of the first message being consumed, kafka partition decides that consumer did not provide any response, do a auto-balancing and assign the same message to another partition.
Upvotes: 0
Reputation: 6126
You can certainly increase the number of attempts (maxAttempts
property) to something like Integer.MAX_VALUE
, or you can provide an instance of your own RetryTemplate
bean which could be configured as you wish.
Here is where you can get more info https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_retry_template
Upvotes: 0