user3693309
user3693309

Reputation: 343

Deserializing a kafka message without schema registry

I am writing a kafka consumer that should deserialize incoming messages from avro. I do have the schema for the messages and was wondering what is the best way to deserialize those in vanilla kafka.

I searched for a while, but all the examples that I see are for File deserialization like this http://avro.apache.org/docs/1.9.0/gettingstartedjava.html and do not cover the kafka message portion.

I did use the avro maven plugin and converted my schema to the pojo class.

Any advise is appreciated!

Thank you

Upvotes: 4

Views: 5668

Answers (3)

Jude Niroshan
Jude Niroshan

Reputation: 4460

Inspired by the answer from @user3693309

You can write a generic class like below.

public class CustomAvroByteReader<T extends SpecificRecordBase> {
    final SpecificDatumReader<T> reader;

    public CustomAvroByteReader(final Schema schema) {
        this.reader = new SpecificDatumReader<>(schema);
    }

    public T deserialize(byte[] bytes) throws IOException {
        final byte[] bytesWithoutHeader = new byte[bytes.length - 5];

        System.arraycopy(bytes, 5, bytesWithoutHeader, 0, bytes.length - 5);

        Decoder decoder = DecoderFactory.get().binaryDecoder(bytesWithoutHeader, null);
        return reader.read(null, decoder);
    }
}

and then, you can use it in your consumer like below:

private static final CustomAvroByteReader<Payment> AVRO_BYTE_READER = new CustomAvroByteReader<>(Payment.getClassSchema());

...

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);

...
        while (true) {
            final ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
            for (final ConsumerRecord<String, byte[]> record : records) {
                final String key = record.key();
                Payment value = AVRO_BYTE_READER.deserialize(record.value());

                System.out.printf("key = %s, value = %s%n", key, value);
            }
        }

Upvotes: 4

user3693309
user3693309

Reputation: 343

I ended up using the SpecificDatumReaderfor this.

The complete example can be found here https://cwiki.apache.org/confluence/display/AVRO/FAQ.

But the idea is that as long as you have the avro schema (or the class of the object compiled by avro tools using the corresponding schema) You can use the above approach to deserialize the massages that are of the type of byte[].

Upvotes: 3

OneCricketeer
OneCricketeer

Reputation: 191671

You need to reverse the process of the producer. If the producer used the schema registry serializer, then you will need the registry for the deserializer, regardless of having the schema on your own.

Without knowing that serializer logic, it'll be difficult to write a deserializer.

The short answer is that you would use a BinaryDecoder - just like Confluent does

Upvotes: 4

Related Questions