Reputation: 209
I have a spring boot application and i am trying to create a Kafka retry listener wherein, based of the exception type i need to change the retry attempts.
For example:
If exception type is A then the retry attempts should be 5
If exception type is B then the retry attempts should be 10
Can anyone recommend how do to this in Spring Kafka?
Following is my ListenerFactory. I am using SeekToCurrentErrorHandler
Bean
public ConcurrentKafkaListenerContainerFactory<String, test> retryListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, test> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler((record, exception) -> {
System.out.println(
"RetryPolicy** limit has been exceeded! You should really handle this better." + record.key());
}, new FixedBackOff(0L, 3L));
errorHandler.addNotRetryableException(IllegalArgumentException.class);
errorHandler.setCommitRecovered(true);
factory.setErrorHandler(errorHandler);
// to keep the consumers alive the failure gets reported to broker so that
// consumers remain alive
factory.setStatefulRetry(true);
factory.setConcurrency(2);
return factory;
}
Upvotes: 1
Views: 1666
Reputation: 174494
Starting with version 2.6, you can add a function to determine the BackOff
to use, based on the consumer record and/or exception:
/**
* Set a function to dynamically determine the {@link BackOff} to use, based on the
* consumer record and/or exception. If null is returned, the default BackOff will be
* used.
* @param backOffFunction the function.
* @since 2.6
*/
public void setBackOffFunction(BiFunction<ConsumerRecord<?, ?>, Exception, BackOff> backOffFunction) {
this.failureTracker.setBackOffFunction(backOffFunction);
}
This is only called when there is no current BackOff
for this record (i.e. it won't help with your other question).
Upvotes: 2