Ho Wing Kent
Ho Wing Kent

Reputation: 51

SpringBoot microservice @StreamListener retry unlimited time when it throw RunTimeException

I have a @StreamListener method where it will perform REST call. When REST call return exception, @StreamListener method will throw RunTimeException and perform retry. @StreamListener method will retry unlimited times when it throw RuntimeException

Spring Cloud Stream Retry configuration:

spring.cloud.stream.kafka.bindings.inputChannel.consumer.enableDlq=true
spring.cloud.stream.bindings.inputChannel.consumer.maxAttempts=3
spring.cloud.stream.bindings.inputChannel.consumer.concurrency=3
spring.cloud.stream.bindings.inputChannel.consumer.backOffInitialInterval=300000
spring.cloud.stream.bindings.inputChannel.consumer.backOffMaxInterval=600000

SpringBoot microservice dependencies version:

Spring Boot 2.0.3
Spring Cloud Stream Elmhurst.RELEASE
Kafka broker 1.1.0

Upvotes: 2

Views: 3189

Answers (3)

A developer
A developer

Reputation: 370

Using RetryTemplate or increasing maxAttempts property has the restriction that retries should be completed within max.poll.interval.ms, otherwise Kafka broker will think that consumer is down and reassigns the partition to another consumer(if available).

Other option is to make the listener re-read the same message from Kafka using consumer.seek method.

@StreamListener("events")
public void handleEvent(@Payload String eventString, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer,
                             @Header(KafkaHeaders.RECEIVED_PARTITION_ID) String partitionId,
                             @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                             @Header(KafkaHeaders.OFFSET) String offset) {
   try {
       //do the logic (example: REST call)
   } catch (Exception e) { // Catch only specific exceptions that can be retried
        consumer.seek(new TopicPartition(topic, Integer.parseInt(partitionId)), Long.parseLong(offset));
   }
}

Upvotes: 2

Ho Wing Kent
Ho Wing Kent

Reputation: 51

after a few trial and error, we found out that kafka configuration: max.poll.interval.ms is defaulted to 5 minutes. Due to our consumer retry mechanism, our whole retry process will take 15 minutes for the worst case scenario.

So after 5 minutes of the first message being consumed, kafka partition decides that consumer did not provide any response, do a auto-balancing and assign the same message to another partition.

Upvotes: 0

Oleg Zhurakousky
Oleg Zhurakousky

Reputation: 6126

You can certainly increase the number of attempts (maxAttempts property) to something like Integer.MAX_VALUE, or you can provide an instance of your own RetryTemplate bean which could be configured as you wish. Here is where you can get more info https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_retry_template

Upvotes: 0

Related Questions