Reputation: 343
I am writing a kafka consumer that should deserialize incoming messages from avro. I do have the schema for the messages and was wondering what is the best way to deserialize those in vanilla kafka.
I searched for a while, but all the examples that I see are for File deserialization like this http://avro.apache.org/docs/1.9.0/gettingstartedjava.html
and do not cover the kafka message portion.
I did use the avro maven plugin and converted my schema to the pojo class.
Any advise is appreciated!
Thank you
Upvotes: 4
Views: 5668
Reputation: 4460
Inspired by the answer from @user3693309
You can write a generic class like below.
public class CustomAvroByteReader<T extends SpecificRecordBase> {
final SpecificDatumReader<T> reader;
public CustomAvroByteReader(final Schema schema) {
this.reader = new SpecificDatumReader<>(schema);
}
public T deserialize(byte[] bytes) throws IOException {
final byte[] bytesWithoutHeader = new byte[bytes.length - 5];
System.arraycopy(bytes, 5, bytesWithoutHeader, 0, bytes.length - 5);
Decoder decoder = DecoderFactory.get().binaryDecoder(bytesWithoutHeader, null);
return reader.read(null, decoder);
}
}
and then, you can use it in your consumer like below:
private static final CustomAvroByteReader<Payment> AVRO_BYTE_READER = new CustomAvroByteReader<>(Payment.getClassSchema());
...
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
...
while (true) {
final ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
for (final ConsumerRecord<String, byte[]> record : records) {
final String key = record.key();
Payment value = AVRO_BYTE_READER.deserialize(record.value());
System.out.printf("key = %s, value = %s%n", key, value);
}
}
Upvotes: 4
Reputation: 343
I ended up using the SpecificDatumReader
for this.
The complete example can be found here https://cwiki.apache.org/confluence/display/AVRO/FAQ.
But the idea is that as long as you have the avro schema (or the class of the object compiled by avro tools
using the corresponding schema) You can use the above approach to deserialize the massages that are of the type of byte[].
Upvotes: 3
Reputation: 191671
You need to reverse the process of the producer. If the producer used the schema registry serializer, then you will need the registry for the deserializer, regardless of having the schema on your own.
Without knowing that serializer logic, it'll be difficult to write a deserializer.
The short answer is that you would use a BinaryDecoder - just like Confluent does
Upvotes: 4