user1945064
user1945064

Reputation: 209

How to set change the retry attempts based of the exception type in Kafka SeekToCurrentErrorHandler?

I have a spring boot application and i am trying to create a Kafka retry listener wherein, based of the exception type i need to change the retry attempts.

For example:

If exception type is A then the retry attempts should be 5
If exception type is B then the retry attempts should be 10

Can anyone recommend how do to this in Spring Kafka?

Following is my ListenerFactory. I am using SeekToCurrentErrorHandler

Bean
    public ConcurrentKafkaListenerContainerFactory<String, test> retryListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, test> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler((record, exception) -> {
            System.out.println(
                    "RetryPolicy** limit has been exceeded! You should really handle this better." + record.key());
        }, new FixedBackOff(0L, 3L));
        errorHandler.addNotRetryableException(IllegalArgumentException.class);
        errorHandler.setCommitRecovered(true);
        factory.setErrorHandler(errorHandler);
        // to keep the consumers alive the failure gets reported to broker so that
        // consumers remain alive
        factory.setStatefulRetry(true);
        factory.setConcurrency(2);
        return factory;
    }

Upvotes: 1

Views: 1666

Answers (1)

Gary Russell
Gary Russell

Reputation: 174494

Starting with version 2.6, you can add a function to determine the BackOff to use, based on the consumer record and/or exception:

/**
 * Set a function to dynamically determine the {@link BackOff} to use, based on the
 * consumer record and/or exception. If null is returned, the default BackOff will be
 * used.
 * @param backOffFunction the function.
 * @since 2.6
 */
public void setBackOffFunction(BiFunction<ConsumerRecord<?, ?>, Exception, BackOff> backOffFunction) {
    this.failureTracker.setBackOffFunction(backOffFunction);
}

This is only called when there is no current BackOff for this record (i.e. it won't help with your other question).

Upvotes: 2

Related Questions