Lolly
Lolly

Reputation: 36372

Apache Kafka Consumer Deserialization Error

I am using kafka for consuming messages. While consuming messages, there are possibility that I may get different messages which would cause DeserializationException. I want to skip the records that causes DeserializationException and process the one which is not causing any issues.

All Kafka related properties are configured through properties like below,

kafka:
    producer:
      bootstrap-servers:
        - PRODUCER_BROKERS
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
    consumer:
      key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      bootstrap-servers:
        - CONSUMER_BROKERS
      properties:
        key.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
        value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
        spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
        spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer

When I googled I get some solution by implementing ErrorHandler from How to catch deserialization error in Kafka-Spring? but since I am using properties I am not sure, how can I bind it to ConcurrentKafkaListenerContainerFactory. What is a better approach?

Upvotes: 0

Views: 3024

Answers (1)

Gary Russell
Gary Russell

Reputation: 174484

Your configuration looks correct.

The default error handler (DefaultErrorHandler) will discard (log) the records with failed deserialization errors.

Upvotes: 1

Related Questions