Royhunt
Royhunt

Reputation: 3

How can we use DeadLetterPublishingRecoverer with RetryTemplate?

I wanted to use the RetryTemplate with DeadLetterPublishingRecoverer.

How can I used so that it will read the retry count and retryInterval from RetryTemplate and after retry it will move to dlq.

    @Bean
    public RetryTemplate retryTemplate(){
        RetryTemplate retryTemplate = new RetryTemplate();
        SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
        simpleRetryPolicy.setMaxAttempts(retryMaxAttempts);
        FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(retryInterval);
        retryTemplate.setRetryPolicy(retryPolicy());
        retryTemplate.setBackOffPolicy(backOffPolicy());
        return retryTemplate;
    }


    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(ChainedKafkaTransactionManager<String, String> chainedTM) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(pollTimeout);
        factory.getContainerProperties().setSyncCommits(true);
        factory.setRetryTemplate(retryTemplate());
        factory.getContainerProperties().setAckOnError(false);
        factory.getContainerProperties().setTransactionManager(chainedTM);
        factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), 1));
        return factory;
    }

Upvotes: 0

Views: 3400

Answers (1)

Gary Russell
Gary Russell

Reputation: 174769

You should do the recovery (publishing) in the retry logic rather than in the error handler. See this answer.

        factory.setRecoveryCallback(context -> {
            recoverer.accept((ConsumerRecord<?, ?>) context.getAttribute("record"),
                    (Exception) context.getLastThrowable());
            return null;
        });

Where recoverer is the DeadLetterPublishingRecoverer.

EDIT

/**
 * Create an instance with the provided template and destination resolving function,
 * that receives the failed consumer record and the exception and returns a
 * {@link TopicPartition}. If the partition in the {@link TopicPartition} is less than
 * 0, no partition is set when publishing to the topic.
 * @param template the {@link KafkaTemplate} to use for publishing.
 * @param destinationResolver the resolving function.
 */
public DeadLetterPublishingRecoverer(KafkaTemplate<? extends Object, ? extends Object> template,
        BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {
    this(Collections.singletonMap(Object.class, template), destinationResolver);
}

If the DLT doesn't have as many partitions as the original topic, you need a custom destination resolver:

(record, exception) -> new TopicPartition("my.DLT", -1)

With a negative partition, Kafka will choose the partition; the default resolver uses the same partition.

DEFAULT_DESTINATION_RESOLVER = (cr, e) -> new TopicPartition(cr.topic() + ".DLT", cr.partition());

This is explained in [the documentation[(https://docs.spring.io/spring-kafka/docs/2.2.7.RELEASE/reference/html/#dead-letters)

You can also, optionally, configure it with a BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>, which is called to resolve the destination topic and partition. By default, the dead-letter record is sent to a topic named <originalTopic>.DLT (the original topic name suffixed with .DLT) and to the same partition as the original record. Therefore, when you use the default resolver, the dead-letter topic must have at least as many partitions as the original topic. If the returned TopicPartition has a negative partition, the partition is not set in the ProducerRecord, so the partition is selected by Kafka.

Upvotes: 1

Related Questions