chris01
chris01

Reputation: 12431

Hot to get serialized String from ConsumerRecord?

I try to use Kafka with JSON as format for the value. To handle the deserialization in my consumer automatically and also to have a logging I set it up for JsonDeserializer and an custom ErrorHandler.

application.properties

spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer = org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.properties.spring.deserializer.value.delegate.class = org.springframework.kafka.support.serializer.JsonDeserializer

KafkaConfig.java

@Configuration
@EnableKafka
public class KafkaConfig 
{
    @Bean
    public DefaultErrorHandler errorHandler (KafkaTemplate <String, String> template) 
    {
        return new DefaultErrorHandler ()
        {
            @Override
            public void handleRemaining (Exception ex, java.util.List<ConsumerRecord <?, ?>> records,
            Consumer<?, ?> consumer, MessageListenerContainer container) 
            {
              // here we do logging stuff
              System.out.println ("##: " + records.size ());
              System.out.println ("ex: " + ex.getMessage());
              ConsumerRecord <?, ?> cr = records.get (0);
              System.out.println ("ku: " + KafkaUtils.format (cr));
              System.out.println ("st: " + cr.toString ());
              System.out.println ("key: " + cr.key ());
              System.out.println ("val: " + cr.value ());
              System.out.println ("vsz: "+ cr.serializedValueSize());
            }
        };
    }

So far, so good - all is working.

Now I think of a way to the the value as a string to have it in my logging.

How can I get that out of ConsumerRecord?

Output so far is this in case of a wrong message. I simply test with kafka-producer.sh script that is part of kafka and let me send messages by the console. vsz matches my input - so can not be completely wrong.

##: 1
ex: Listener failed; nested exception is org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is java.lang.IllegalStateException: No type information in headers and no default type provided
ku: mytopic-0@82
st: ConsumerRecord(topic = mytopic, partition = 0, leaderEpoch = 0, offset = 82, CreateTime = 1650039248945, serialized key size = -1, serialized value size = 3, headers = RecordHeaders(headers = [RecordHeader(key = springDeserializerExceptionValue, value = [-84, -19,..., 112, 1 100, 102])], isReadOnly = false), key = null, value = null)
key: null
val: null
vsz: 3

My idea is to create a StringSerializer - but cr.value () is null - so thats a dead end.

Upvotes: 0

Views: 2008

Answers (1)

Gary Russell
Gary Russell

Reputation: 174719

You must call super.handleRemaining() in your overridden method so that the records are properly repositioned for the next poll.

The value that could not be deserialized is available in the DeserializationException, which is the cause of the ListenerExecutionFailedExeption passed into the error handler.

/**
 * Get the data that failed deserialization (value or key).
 * @return the data.
 */
public byte[] getData() {
    return this.data; // NOSONAR array reference
}

Upvotes: 1

Related Questions