Reputation: 41
I’m currently implementing an error handling mechanism for Kafka Listeners. This is what I would like to achieve: when an exception occurs within a Kafka Listener, I want to retry processing my record 10 times and if it still fails, send it to a dead letter queue and inform the sender in case of a request/reply scenario (returning a specific RecordFailure object for example). I first defined an ErrorHandler bean which worked fine but didn’t offer the possibility to return a value in case of failure. Then I moved to defining a KafkaListenerErrorHandler bean and specifying it as a parameter of my Kafka Listener. It allowed me to return a specific value but I lost the retry and dead letter queue forwarding policy defined by the ErrorHandler. Eventually, I went with configuring my container factory with a RetryTemplate and RecoveryCallback. I thought it would work as expected but when using a ReplyingKafkaTemplate to send my message, I always end up receiving a timeout exception and realise that, since I defined a RetryTemplate, the onMessage() method from the RetryingMessageListenerAdapter was called instead of the ReplyingKafkaTemplate one. I’m now questioning the scenario itself: does it make sense to combine both a RetryTemplate, a RecoveryCallback and a ReplyingKafkaTemplate to enable request/reply with retry policies using a Kafka Listener? If so, what am I missing here?
Thank you for your time.
Upvotes: 0
Views: 1507
Reputation: 174769
Using a retry template at the listener level is mostly redundant now that the error handlers support backoff and retry.
One way to solve your particular problem would be to enable deliveryAttemptHeader
s.
https://docs.spring.io/spring-kafka/docs/current/reference/html/#delivery-header
Then, in your listener error handler, check the header and when the specific number of attempts is reached, publish the message to a dead letter topic and return the error result. Before the count is reached re-throw the exception so that the SeekToCurrentErrorHandler
will redeliver the record.
Just make sure that the STCEH has enough retries so it always retries, enabling the listener error handler to do its work.
EDIT
Here's an example, showing how to use the DLPR from a listener error handler, by adding the raw ConsumerRecord
in a header...
@SpringBootApplication
public class So66982480Application {
public static void main(String[] args) {
SpringApplication.run(So66982480Application.class, args);
}
@Bean
ReplyingKafkaTemplate<String, String, String> rkt(ProducerFactory<String, String> pf,
ConcurrentKafkaListenerContainerFactory<String, String> factory,
KafkaTemplate<String, String> template) {
factory.getContainerProperties().setDeliveryAttemptHeader(true);
factory.setReplyTemplate(template);
ConcurrentMessageListenerContainer<String, String> container = factory.createContainer("so66982480-replies");
container.getContainerProperties().setGroupId("so66982480-replies");
return new ReplyingKafkaTemplate<>(pf, container);
}
@Bean
RecordMessageConverter converter() {
MessagingMessageConverter converter = new MessagingMessageConverter() {
@Override
public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
Consumer<?, ?> consumer, Type type) {
Message<?> message = super.toMessage(record, acknowledgment, consumer, type);
return MessageBuilder.fromMessage(message)
.setHeader(KafkaHeaders.RAW_DATA, record)
.build();
}
};
return converter;
}
@Bean
KafkaTemplate<String, String> template(ProducerFactory<String, String> pf) {
return new KafkaTemplate<>(pf);
}
@Bean
NewTopic topic1() {
return TopicBuilder.name("so66982480").partitions(1).replicas(1).build();
}
@Bean
NewTopic topic2() {
return TopicBuilder.name("so66982480-replies").partitions(1).replicas(1).build();
}
@Bean
NewTopic topic3() {
return TopicBuilder.name("so66982480.DLT").partitions(1).replicas(1).build();
}
@Bean
KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) {
return (msg, ex) -> {
if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 9) {
recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), ex);
return "FAILED";
}
throw ex;
};
}
@Bean
DeadLetterPublishingRecoverer recoverer(KafkaOperations<String, String> template) {
return new DeadLetterPublishingRecoverer(template);
}
@KafkaListener(id = "so66982480", topics = "so66982480", errorHandler = "eh")
@SendTo
public String listen(String in) {
throw new RuntimeException("test");
}
@KafkaListener(id = "so66982480.DLT", topics = "so66982480.DLT")
public void dlt(String in) {
System.out.println("From DLT:" + in);
}
@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
return args -> {
RequestReplyFuture<String, String, String> future =
template.sendAndReceive(new ProducerRecord<String, String>("so66982480", 0, null, "test"),
Duration.ofSeconds(30));
System.out.println(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata());
System.out.println(future.get(30, TimeUnit.SECONDS).value());
};
}
}
spring.kafka.consumer.auto-offset-reset=earliest
From DLT:test
FAILED
Upvotes: 3