Amjad Khader
Amjad Khader

Reputation: 35

Kafka consumer - Reading message header before deserializing

Is there any way to read message headers before deserializing?

I have written the below code, but I am forced to deserializing here, any way to not deserialize?

    while (true) {

        ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
        for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
            for (Header header : consumerRecord.headers()) {
                if (header.key().equals("my header")) {
                    String data = "\n New record received .. \n" +
                            " Value: " + consumerRecord.value() +
                            " Topic: " + consumerRecord.topic() +
                            " Header: " + header.key() +
                            " Partition: " + consumerRecord.partition();

                    logger.info(data);
                }
            }
        }
    }

Upvotes: 2

Views: 10384

Answers (2)

Pedro Henrique
Pedro Henrique

Reputation: 141

For the C# developers using the Confluent.Kafka library I pushed this solution from the context object:

public class KafkaByteDeserializer : IDeserializer<byte[]>
{
        public static KafkaByteDeserializer Instance => new KafkaByteDeserializer();

        public byte[] Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
        {
            if (isNull || IsDeleteOperation(context.Headers))
            {
                return default;
            }

            return data.ToArray();
        }

        private bool IsDeleteOperation(Headers headers)
        {
            // Your logic checking for the delete operation...
        }
}

Upvotes: 1

OneCricketeer
OneCricketeer

Reputation: 191844

You need to write your own Deserializer implementation to be able to access headers before the data payload

Or...

any way to not deserialize

Set deserializer class to the ByteArrayDeserializer rather than the StringDeserializer.

Then, do the deserialization manually,

for (ConsumerRecord<byte[], byte[]> consumerRecord : consumer.poll(Duration.ofSeconds(1))) {
    for (Header header : consumerRecord.headers()) {
       ...

    }
    String s = new String(consumerRecord.value(), "UTF-8"); // for example
}

Upvotes: 5

Related Questions