Reputation: 12431
I try to use Kafka with JSON as format for the value. To handle the deserialization in my consumer automatically and also to have a logging I set it up for JsonDeserializer and an custom ErrorHandler.
application.properties
spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer = org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.properties.spring.deserializer.value.delegate.class = org.springframework.kafka.support.serializer.JsonDeserializer
KafkaConfig.java
@Configuration
@EnableKafka
public class KafkaConfig
{
@Bean
public DefaultErrorHandler errorHandler (KafkaTemplate <String, String> template)
{
return new DefaultErrorHandler ()
{
@Override
public void handleRemaining (Exception ex, java.util.List<ConsumerRecord <?, ?>> records,
Consumer<?, ?> consumer, MessageListenerContainer container)
{
// here we do logging stuff
System.out.println ("##: " + records.size ());
System.out.println ("ex: " + ex.getMessage());
ConsumerRecord <?, ?> cr = records.get (0);
System.out.println ("ku: " + KafkaUtils.format (cr));
System.out.println ("st: " + cr.toString ());
System.out.println ("key: " + cr.key ());
System.out.println ("val: " + cr.value ());
System.out.println ("vsz: "+ cr.serializedValueSize());
}
};
}
So far, so good - all is working.
Now I think of a way to the the value as a string to have it in my logging.
How can I get that out of ConsumerRecord?
Output so far is this in case of a wrong message. I simply test with kafka-producer.sh script that is part of kafka and let me send messages by the console. vsz matches my input - so can not be completely wrong.
##: 1
ex: Listener failed; nested exception is org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is java.lang.IllegalStateException: No type information in headers and no default type provided
ku: mytopic-0@82
st: ConsumerRecord(topic = mytopic, partition = 0, leaderEpoch = 0, offset = 82, CreateTime = 1650039248945, serialized key size = -1, serialized value size = 3, headers = RecordHeaders(headers = [RecordHeader(key = springDeserializerExceptionValue, value = [-84, -19,..., 112, 1 100, 102])], isReadOnly = false), key = null, value = null)
key: null
val: null
vsz: 3
My idea is to create a StringSerializer - but cr.value () is null - so thats a dead end.
Upvotes: 0
Views: 2008
Reputation: 174719
You must call super.handleRemaining()
in your overridden method so that the records are properly repositioned for the next poll.
The value that could not be deserialized is available in the DeserializationException
, which is the cause of the ListenerExecutionFailedExeption
passed into the error handler.
/**
* Get the data that failed deserialization (value or key).
* @return the data.
*/
public byte[] getData() {
return this.data; // NOSONAR array reference
}
Upvotes: 1