Reputation: 1727
I'm using spring-kafka 2.2.8.RELEASE and using @KafkaListener annotation to create a consumer and here is my consumer configuration code.
@Bean
public <K,V> ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(primaryConsumerFactory());
factory.setRetryTemplate(retryTemplate());
return factory;
}
@Bean
public DefaultKafkaConsumerFactory<Object, Object> primaryConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(MyConsumerConfig.getConfigs());
}
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setListeners(new RetryListener[]{myKafkaRetryListener});
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(Integer.parseInt(3));
retryTemplate.setRetryPolicy(retryPolicy);
ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy();
exponentialBackOffPolicy.setInitialInterval(500);
//As per the spring-kafka documentation, maxInterval (60000 ms) should be set less than max.poll.interval.ms (600000 ms)
exponentialBackOffPolicy.setMaxInterval(60000);
retryTemplate.setBackOffPolicy(exponentialBackOffPolicy);
return retryTemplate;
}
Here is my custom retry listener code:
@Component
public class MyRetryListener implements RetryListener {
@Override
public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
System.out.println("##### IN open method");
return false;
}
@Override
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
Throwable throwable) {
System.out.println("##### IN close method");
}
@Override
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback,
Throwable throwable) {
System.out.println("##### Got an error and will retry");
}
}
Now, when I'm sending a message to a test topic, and in the consumer I'm throwing a TimeoutException so that the retry will trigger and here is my consumer code.
@KafkaListener(topics = "CONSUMER_RETRY_TEST_TOPIC")
public void listen(ConsumerRecord message) throws RetriableException {
System.out.println("CONSUMER_RETRY testing - Received message with key "+message.key()+" on topic " + CONSUMER_RETRY_TEST_TOPIC + " \n \n ");
throw new TimeoutException();
}
With the above code configuration, the retry is not triggered and 'onError' method of my custom retry listener is never invoked and I'm getting the below error. Please suggest what am i missing here?
org.springframework.retry.TerminatedRetryException: Retry terminated abnormally by interceptor before first attempt
Upvotes: 1
Views: 1272
Reputation: 174554
See the JavaDocs for RetryListener.open().
<T,E extends Throwable> boolean open(RetryContext context,
RetryCallback<T,E> callback)
Called before the first attempt in a retry. For instance, implementers can set up state that is needed by the policies in the RetryOperations. The whole retry can be vetoed by returning false from this method, in which case a TerminatedRetryException will be thrown.
Type Parameters:
T - the type of object returned by the callback
E - the type of exception it declares may be thrown
Parameters:
context - the current RetryContext.
callback - the current RetryCallback.
Returns:
true if the retry should proceed.
You need to return true not false.
Upvotes: 1