Reputation: 1727
I'm using spring boot 2.1.7.RELEASE and spring-kafka 2.2.8.RELEASE.And I'm using @KafkaListener annotation to create a consumer and I'm using all default settings for the consumer.And I'm using below configuration as specified in the Spring-Kafka documentation.
// other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, AvroDeserializer.class.getName());
return new DefaultKafkaConsumerFactory<>(props);
Now, My requirements are when there is a deserialization exception. Please suggest how can i do this.
Upvotes: 1
Views: 586
Reputation: 174494
Add a SeekToCurrentErrorHandler
, configured with a DeadLetterPublishingRecoverer
to the container.
See the documentation and here.
Deserialization exceptions are treated as fatal and will not be retried.
If you want to write your own error handler for some reason, the code in the DeadLetterPublishingRecoverer
shows how the exception information is extracted.
Upvotes: 1