Joyson Rego
Joyson Rego

Reputation: 1078

How to set Retry Interval time when using SeekToCurrentErrorHandler in spring kafka

I am handling the container listener errors using SeekToCurrentErrorHandler in the spring-boot application. I want to set the retry interval if an exception occurred it should wait some certain time and retry until the max attempts.

I have tried adding RetryTemplate by setting up a backup policy. but it didn't work as excepted. when the error occurs max attempts time it will call SeekToCurrentErrorHandler 2 times.

@Bean
    public RetryPolicy retryPolicy() {
        SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
        simpleRetryPolicy.setMaxAttempts(retryMaxAttempts);
        return simpleRetryPolicy;
    }


    @Bean
    public BackOffPolicy backOffPolicy() {
        FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(retryInterval);
        return backOffPolicy;
    }


    @Bean
    public RetryTemplate retryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setRetryPolicy(retryPolicy());
        retryTemplate.setBackOffPolicy(backOffPolicy());
        return retryTemplate;
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(
            ChainedKafkaTransactionManager<String, String> chainedTM) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(pollTimeout);
        factory.getContainerProperties().setSyncCommits(true);
    // ------->>     factory.setRetryTemplate(retryTemplate());
        factory.getContainerProperties().setAckOnError(false);
        factory.getContainerProperties().setTransactionManager(chainedTM);
        SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler((record, exception) -> {
        // handle errors 
        }, retryMaxAttempts);
        factory.setErrorHandler(errorHandler);
        log.debug("Kafka Receiver Config kafkaListenerContainerFactory created");
        return factory;
    }


How to set retry Interval time when using the SeekToCurrentErrorHandler?

Upvotes: 2

Views: 1736

Answers (1)

Gary Russell
Gary Russell

Reputation: 174554

It's a new feature in the next release (2.3).

You can, however, do it with a retry template but the total number of attempts will be the multiple of the two properties.

Upvotes: 2

Related Questions