Reputation: 4004
I am using Spring Kafka consumer and Avro schema to build my application.
However, if the message can't be deserializer to the specified Avro specific record I build, the consumer will keep retrying to consumer same message over and over again (infinite retry).
For this case, how can I configure the consumer application to skip the current message and move to the next offset if there is deserializer exception occurs for my consumer.
I have looked at Spring Kafka error handle which can only handle exceptions in the listener not during the deserialization stage.
My consumer application is very simple:
@KafkaListener(id = "demo-consumer-stream-group", topics = "customer-output-")
public void process(ConsumerRecord<String, Customer> record) {
LOGGER.info("Customer key: {} and value: {}", record.key(), record.value());
LOGGER.info("topic: {}, partition: {}, offset: {}", record.topic(), record.partition(), record.offset());
}
Base on this code, sometimes the received message may not deserialized to the correct Customer
object.
Also, I saw a recent solution is using ErrorHandlingDeserializer2
of Spring Kafka to handle this, but since I am using KafkaAvroDeserializer
how can I work out those configs? My current config is:
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
Upvotes: 6
Views: 4406
Reputation: 2453
You need to set your current value + key deserializer both to ErrorHandlingDeserializer.class and the the current value + key deserializer to the ErrorHandlingDeserializer Key/Value deserializer properties
This will look similar to something like that:
... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, KafkaAvroDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
Upvotes: 4
Reputation: 174554
It's explained in the documentation.
You set the deserializer to the error handling deserializer and its delegate via the custom Spring property.
You can use the DefaultKafkaConsumerFactory constructor that takes key and value Deserializer objects and wire in appropriate ErrorHandlingDeserializer2 instances that you have configured with the proper delegates. Alternatively, you can use consumer configuration properties (which are used by the ErrorHandlingDeserializer) to instantiate the delegates. The property names are ErrorHandlingDeserializer2.KEY_DESERIALIZER_CLASS and ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS. The property value can be a class or class name. The following example shows how to set these properties:
... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);
Upvotes: 0