Reputation: 3
I wanted to use the RetryTemplate with DeadLetterPublishingRecoverer.
How can I used so that it will read the retry count and retryInterval from RetryTemplate and after retry it will move to dlq.
@Bean
public RetryTemplate retryTemplate(){
RetryTemplate retryTemplate = new RetryTemplate();
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
simpleRetryPolicy.setMaxAttempts(retryMaxAttempts);
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(retryInterval);
retryTemplate.setRetryPolicy(retryPolicy());
retryTemplate.setBackOffPolicy(backOffPolicy());
return retryTemplate;
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(ChainedKafkaTransactionManager<String, String> chainedTM) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(pollTimeout);
factory.getContainerProperties().setSyncCommits(true);
factory.setRetryTemplate(retryTemplate());
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setTransactionManager(chainedTM);
factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), 1));
return factory;
}
Upvotes: 0
Views: 3400
Reputation: 174769
You should do the recovery (publishing) in the retry logic rather than in the error handler. See this answer.
factory.setRecoveryCallback(context -> {
recoverer.accept((ConsumerRecord<?, ?>) context.getAttribute("record"),
(Exception) context.getLastThrowable());
return null;
});
Where recoverer
is the DeadLetterPublishingRecoverer
.
EDIT
/**
* Create an instance with the provided template and destination resolving function,
* that receives the failed consumer record and the exception and returns a
* {@link TopicPartition}. If the partition in the {@link TopicPartition} is less than
* 0, no partition is set when publishing to the topic.
* @param template the {@link KafkaTemplate} to use for publishing.
* @param destinationResolver the resolving function.
*/
public DeadLetterPublishingRecoverer(KafkaTemplate<? extends Object, ? extends Object> template,
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {
this(Collections.singletonMap(Object.class, template), destinationResolver);
}
If the DLT doesn't have as many partitions as the original topic, you need a custom destination resolver:
(record, exception) -> new TopicPartition("my.DLT", -1)
With a negative partition, Kafka will choose the partition; the default resolver uses the same partition.
DEFAULT_DESTINATION_RESOLVER = (cr, e) -> new TopicPartition(cr.topic() + ".DLT", cr.partition());
This is explained in [the documentation[(https://docs.spring.io/spring-kafka/docs/2.2.7.RELEASE/reference/html/#dead-letters)
You can also, optionally, configure it with a
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
, which is called to resolve the destination topic and partition. By default, the dead-letter record is sent to a topic named<originalTopic>.DLT
(the original topic name suffixed with .DLT) and to the same partition as the original record. Therefore, when you use the default resolver, the dead-letter topic must have at least as many partitions as the original topic. If the returnedTopicPartition
has a negative partition, the partition is not set in theProducerRecord
, so the partition is selected by Kafka.
Upvotes: 1