Reputation: 35
Is there any way to read message headers before deserializing?
I have written the below code, but I am forced to deserializing here, any way to not deserialize?
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
for (Header header : consumerRecord.headers()) {
if (header.key().equals("my header")) {
String data = "\n New record received .. \n" +
" Value: " + consumerRecord.value() +
" Topic: " + consumerRecord.topic() +
" Header: " + header.key() +
" Partition: " + consumerRecord.partition();
logger.info(data);
}
}
}
}
Upvotes: 2
Views: 10384
Reputation: 141
For the C# developers using the Confluent.Kafka library I pushed this solution from the context object:
public class KafkaByteDeserializer : IDeserializer<byte[]>
{
public static KafkaByteDeserializer Instance => new KafkaByteDeserializer();
public byte[] Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
{
if (isNull || IsDeleteOperation(context.Headers))
{
return default;
}
return data.ToArray();
}
private bool IsDeleteOperation(Headers headers)
{
// Your logic checking for the delete operation...
}
}
Upvotes: 1
Reputation: 191844
You need to write your own Deserializer implementation to be able to access headers before the data payload
Or...
any way to not deserialize
Set deserializer class to the ByteArrayDeserializer rather than the StringDeserializer.
Then, do the deserialization manually,
for (ConsumerRecord<byte[], byte[]> consumerRecord : consumer.poll(Duration.ofSeconds(1))) {
for (Header header : consumerRecord.headers()) {
...
}
String s = new String(consumerRecord.value(), "UTF-8"); // for example
}
Upvotes: 5