Jeremy
Jeremy

Reputation: 41

Request/Reply and Retry Policy for Kafka Listeners

I’m currently implementing an error handling mechanism for Kafka Listeners. This is what I would like to achieve: when an exception occurs within a Kafka Listener, I want to retry processing my record 10 times and if it still fails, send it to a dead letter queue and inform the sender in case of a request/reply scenario (returning a specific RecordFailure object for example). I first defined an ErrorHandler bean which worked fine but didn’t offer the possibility to return a value in case of failure. Then I moved to defining a KafkaListenerErrorHandler bean and specifying it as a parameter of my Kafka Listener. It allowed me to return a specific value but I lost the retry and dead letter queue forwarding policy defined by the ErrorHandler. Eventually, I went with configuring my container factory with a RetryTemplate and RecoveryCallback. I thought it would work as expected but when using a ReplyingKafkaTemplate to send my message, I always end up receiving a timeout exception and realise that, since I defined a RetryTemplate, the onMessage() method from the RetryingMessageListenerAdapter was called instead of the ReplyingKafkaTemplate one. I’m now questioning the scenario itself: does it make sense to combine both a RetryTemplate, a RecoveryCallback and a ReplyingKafkaTemplate to enable request/reply with retry policies using a Kafka Listener? If so, what am I missing here?

Thank you for your time.

Upvotes: 0

Views: 1507

Answers (1)

Gary Russell
Gary Russell

Reputation: 174769

Using a retry template at the listener level is mostly redundant now that the error handlers support backoff and retry.

One way to solve your particular problem would be to enable deliveryAttemptHeaders.

https://docs.spring.io/spring-kafka/docs/current/reference/html/#delivery-header

Then, in your listener error handler, check the header and when the specific number of attempts is reached, publish the message to a dead letter topic and return the error result. Before the count is reached re-throw the exception so that the SeekToCurrentErrorHandler will redeliver the record.

Just make sure that the STCEH has enough retries so it always retries, enabling the listener error handler to do its work.

EDIT

Here's an example, showing how to use the DLPR from a listener error handler, by adding the raw ConsumerRecord in a header...

@SpringBootApplication
public class So66982480Application {

    public static void main(String[] args) {
        SpringApplication.run(So66982480Application.class, args);
    }

    @Bean
    ReplyingKafkaTemplate<String, String, String> rkt(ProducerFactory<String, String> pf,
            ConcurrentKafkaListenerContainerFactory<String, String> factory,
            KafkaTemplate<String, String> template) {

        factory.getContainerProperties().setDeliveryAttemptHeader(true);
        factory.setReplyTemplate(template);
        ConcurrentMessageListenerContainer<String, String> container = factory.createContainer("so66982480-replies");
        container.getContainerProperties().setGroupId("so66982480-replies");
        return new ReplyingKafkaTemplate<>(pf, container);
    }

    @Bean
    RecordMessageConverter converter() {
        MessagingMessageConverter converter = new MessagingMessageConverter() {

            @Override
            public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
                    Consumer<?, ?> consumer, Type type) {

                Message<?> message = super.toMessage(record, acknowledgment, consumer, type);
                return MessageBuilder.fromMessage(message)
                        .setHeader(KafkaHeaders.RAW_DATA, record)
                        .build();
            }

        };
        return converter;
    }

    @Bean
    KafkaTemplate<String, String> template(ProducerFactory<String, String> pf) {
        return new KafkaTemplate<>(pf);
    }

    @Bean
    NewTopic topic1() {
        return TopicBuilder.name("so66982480").partitions(1).replicas(1).build();
    }

    @Bean
    NewTopic topic2() {
        return TopicBuilder.name("so66982480-replies").partitions(1).replicas(1).build();
    }

    @Bean
    NewTopic topic3() {
        return TopicBuilder.name("so66982480.DLT").partitions(1).replicas(1).build();
    }

    @Bean
    KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) {
        return (msg, ex) -> {
            if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 9) {
                recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), ex);
                return "FAILED";
            }
            throw ex;
        };
    }

    @Bean
    DeadLetterPublishingRecoverer recoverer(KafkaOperations<String, String> template) {
        return new DeadLetterPublishingRecoverer(template);
    }

    @KafkaListener(id = "so66982480", topics = "so66982480", errorHandler = "eh")
    @SendTo
    public String listen(String in) {
        throw new RuntimeException("test");
    }

    @KafkaListener(id = "so66982480.DLT", topics = "so66982480.DLT")
    public void dlt(String in) {
        System.out.println("From DLT:" + in);
    }

    @Bean
    public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
        return args -> {
            RequestReplyFuture<String, String, String> future =
                    template.sendAndReceive(new ProducerRecord<String, String>("so66982480", 0, null, "test"),
                            Duration.ofSeconds(30));
            System.out.println(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata());
            System.out.println(future.get(30, TimeUnit.SECONDS).value());
        };
    }

}
spring.kafka.consumer.auto-offset-reset=earliest
From DLT:test
FAILED

Upvotes: 3

Related Questions