Reputation: 567
I have an application using Spring Cloud Stream and Spring Kafka, which processes Avro messages. The application works fine, but now I'd like to add some error handling.
The Goal: I would like to catch deserialization exceptions, build a new object with the exception details + original Kafka message + custom context info, and push this object to a dedicated Kafka topic. Basically a DLQ, but the original message will be intercepted and decorated.
The Problem: While I can intercept the exception, I can't figure out how to acquire the original message from Kafka (TODO 1, below). I've been all through the data object returned in ConsumerAwareErrorHandler.handle and I don't see it there.
Below is the code I have:
@EnableBinding(EventStream.class)
@SpringBootApplication
@Slf4j
public class SpringcloudApplication {
public static void main(String[] args) {
SpringApplication.run(SpringcloudApplication.class, args);
}
/* Configure custom exception handler */
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> cust() {
return (container, destination, group) -> {
container.setErrorHandler(new ConsumerAwareErrorHandler() {
@Override
public void handle(Exception thrownException, ConsumerRecord<?, ?> data, Consumer<?, ?> consumer) {
log.info("Got error with data: {}", data);
// TODO 1 - How to get original message?
// TODO 2 - Send to dedicated (DLQ) topic
}
});
};
}
@StreamListener(EventStream.INBOUND)
public void consumeEvent(@Payload Message message) {
log.info("Consuming event --> {}", message.toString());
produceEvent(message);
}
@Autowired private EventStream eventStream;
public Boolean produceEvent(Message message) {
log.info("Producing event --> {}", message.toString());
return eventStream
.producer()
.send(MessageBuilder.withPayload(message)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.build());
}
}
And the properties files:
spring:
cloud:
stream:
default-binder: kafka
default:
consumer:
useNativeEncoding: true
producer:
useNativeEncoding: true
kafka:
binder:
brokers: localhost:9092
producer-properties:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
schema.registry.url: "http://localhost:8081"
consumer-properties:
key.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
schema.registry.url: "http://localhost:8081"
specific.avro.reader: true
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
bindings:
event-consumer:
destination: data_stream_in # incoming topic
contentType: application/**avro
group: data_stream_consumer
event-producer:
destination: data_stream_out
contentType: application/**avro
I am using the following versions:
Any help is appreciated!
Upvotes: 2
Views: 2440
Reputation: 5904
The second argument in the handle
method is the ConsumerRecord
which is the original Kafka record, but if you want the record to be automatically sent to a DLQ you can do the following.
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(SeekToCurrentErrorHandler errorHandler) {
return (container, dest, group) -> {
container.setErrorHandler(errorHandler);
};
}
@Bean
public SeekToCurrentErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
return new SeekToCurrentErrorHandler(deadLetterPublishingRecoverer);
}
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaOperations bytesTemplate) {
return new DeadLetterPublishingRecoverer(bytesTemplate);
}
Essentially, you are setting up a SeekToCurrentErrorHandler
which is capable of sending the failed record to the DLQ. See the ref docs for Spring for Apache Kafka for more details on how DeadLetterPublishingRecoverer
works: https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters
;
More info on SeekToCurrentErrorHandler
:https://docs.spring.io/spring-kafka/docs/current/reference/html/#seek-to-current
you also need to configure and ErrorHandlingDeserializer,
spring.cloud.stream.kafka.binder.configuration.value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.cloud.stream.kafka.binder.configuration.spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
...
similar for the value class.
More info on ErrorHandlingDeserializer
: https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handling-deserializer
If you want to modify the record and add a custom message to DLQ, you can do that by overrding the handle method and then gain access to the ConsumerRecord
and then call the super class method.
Upvotes: 2