Gourab27
Gourab27

Reputation: 17

How to configure DeadLetterPublisherRecoverer to send error messages to a DLQ in Spring Cloud Stream batch mode

I have created a Kafka Consumer using Spring Cloud Stream and Spring Cloud Function for consuming messages in batch mode from a Kafka topic. Now, I want to send the error batches to a Dead Letter Queue for further debugging of the error.

I am handling retries inside my consumer method with Spring retry. But for non retry-able Exceptions I am looking to send the entire batch to a DLQ.

This is how my consumer looks like:

@Bean
public Consumer<List<GenericRecord>> consume() {
    return (message) -> {
        processMessage(message);  
    }
}

This is how the error handling Configuration looks like:

@Autowired
private DefaultErrorHandler errorHandler;

ListenerContainerCustomizer<AbstractMessageListenerContainer> c = new ListenerContainerCustomizer<AbstractMessageListenerContainer>() {
      @Override
      public void configure(AbstractMessageListenerContainer container, String destinationName, String group) {
        container.setCommonErrorHandler(errorHandler);
      }
}

The Error Handler is enabled with a DeadRecordPublishinRecoverer to send the failed messages to a DLQ:

@Bean
public DefaultErrorHandler errorHandler(KafkaOperations<String, Details> template) {
    return new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template,
        (cr, e) -> new TopicPartition("error.topic.name", 0)),
        new FixedBackOff(0, 0));
}

But this is not sending any message to the error.topic, And from the error logs I can see that it's trying to connect to localhost:9092 instead of the broker I have mentioned in spring.cloud.stream.kafka.binder.brokers.

How do I configure the DLQ provider to read the Kafka metadata from application.properties ?

Also is there a way to configure a Supplier function to create the DLQ provider?

Upvotes: 0

Views: 1498

Answers (2)

Makatun
Makatun

Reputation: 1017

DefaultErrorHandler returns messages to original topic. You want to override that.

DeadLetterPublishingRecoverer dltRecoverer = new DeadLetterPublishingRecoverer(dltTopicTemplate);

    return new DefaultErrorHandler(){
        @Override
        public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
            dltRecoverer.accept(records.get(0), thrownException);
        }
    };

Upvotes: 0

Gary Russell
Gary Russell

Reputation: 174759

You are probably using Boot's auto-configured KafkaTemplate.

Use spring.kafka.bootstrap-servers instead - the binder will use that if there is no spring.cloud.stream.kafka.binder.brokers; that way both the binder and template will connect to the same broker.

You have to throw a BatchListenerFailedException to indicate which record in the batch failed.

Upvotes: 0

Related Questions