Reputation: 36372
I am using kafka for consuming messages. While consuming messages, there are possibility that I may get different messages which would cause DeserializationException. I want to skip the records that causes DeserializationException and process the one which is not causing any issues.
All Kafka related properties are configured through properties like below,
kafka:
producer:
bootstrap-servers:
- PRODUCER_BROKERS
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
consumer:
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
bootstrap-servers:
- CONSUMER_BROKERS
properties:
key.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
When I googled I get some solution by implementing ErrorHandler
from How to catch deserialization error in Kafka-Spring? but since I am using properties I am not sure, how can I bind it to ConcurrentKafkaListenerContainerFactory
. What is a better approach?
Upvotes: 0
Views: 3024
Reputation: 174484
Your configuration looks correct.
The default error handler (DefaultErrorHandler
) will discard (log) the records with failed deserialization errors.
Upvotes: 1