Reputation: 17
I have created a Kafka Consumer using Spring Cloud Stream and Spring Cloud Function for consuming messages in batch mode from a Kafka topic. Now, I want to send the error batches to a Dead Letter Queue for further debugging of the error.
I am handling retries inside my consumer method with Spring retry. But for non retry-able Exceptions I am looking to send the entire batch to a DLQ.
This is how my consumer looks like:
@Bean
public Consumer<List<GenericRecord>> consume() {
return (message) -> {
processMessage(message);
}
}
This is how the error handling Configuration looks like:
@Autowired
private DefaultErrorHandler errorHandler;
ListenerContainerCustomizer<AbstractMessageListenerContainer> c = new ListenerContainerCustomizer<AbstractMessageListenerContainer>() {
@Override
public void configure(AbstractMessageListenerContainer container, String destinationName, String group) {
container.setCommonErrorHandler(errorHandler);
}
}
The Error Handler is enabled with a DeadRecordPublishinRecoverer to send the failed messages to a DLQ:
@Bean
public DefaultErrorHandler errorHandler(KafkaOperations<String, Details> template) {
return new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template,
(cr, e) -> new TopicPartition("error.topic.name", 0)),
new FixedBackOff(0, 0));
}
But this is not sending any message to the error.topic, And from the error logs I can see that it's trying to connect to localhost:9092 instead of the broker I have mentioned in spring.cloud.stream.kafka.binder.brokers
.
How do I configure the DLQ provider to read the Kafka metadata from application.properties
?
Also is there a way to configure a Supplier
function to create the DLQ provider?
Upvotes: 0
Views: 1498
Reputation: 1017
DefaultErrorHandler returns messages to original topic. You want to override that.
DeadLetterPublishingRecoverer dltRecoverer = new DeadLetterPublishingRecoverer(dltTopicTemplate);
return new DefaultErrorHandler(){
@Override
public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
dltRecoverer.accept(records.get(0), thrownException);
}
};
Upvotes: 0
Reputation: 174759
You are probably using Boot's auto-configured KafkaTemplate
.
Use spring.kafka.bootstrap-servers
instead - the binder will use that if there is no spring.cloud.stream.kafka.binder.brokers
; that way both the binder and template will connect to the same broker.
You have to throw a BatchListenerFailedException
to indicate which record in the batch failed.
Upvotes: 0