Venkata Madhu
Venkata Madhu

Reputation: 103

kafka stream with custom AVRO Serde (without schema

I have a stream processing application using AVRO message format. For serialization and deserialization (Serde) it is using io.confluent.kafka.streams.serdes.avro.GenericAvroSerde.

I was trying to create custom AVRO Serde as something like below

public class CustomAvroSerde implements Serde<SpecificRecord> {

    Class<SpecificRecord> targetType;

    public CustomAvroSerde() {
         
    }

    public CustomAvroSerde(Class<SpecificRecord> targetType) {
        this.targetType = targetType;
    }

    @Override
    public Serializer<SpecificRecord> serializer() {
        return new Serializer<SpecificRecord>() {
            @Override
            public byte[] serialize(String s, SpecificRecord data) {
                byte[] result = null;
                if (data != null) {
                    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                    BinaryEncoder binaryEncoder =
                            EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);

                    DatumWriter<SpecificRecord> datumWriter = new SpecificDatumWriter<>(data.getSchema());

                    try {
                        datumWriter.write(data, binaryEncoder);
                        binaryEncoder.flush();
                        byteArrayOutputStream.close();
                    } catch (IOException ioException) {
                        ioException.printStackTrace();
                    }

                    result = byteArrayOutputStream.toByteArray();
                }
                return result;
            }
        };
    }

    @Override
    public Deserializer<SpecificRecord> deserializer() {
        return new Deserializer<SpecificRecord>() {
            @Override
            public SpecificRecord deserialize(String s, byte[] data) {
                SpecificRecord result = null;
                if (data != null) {
                    DatumReader<SpecificRecord> datumReader;
                    try {
                        BinaryDecoder binaryDecoder = new DecoderFactory().createBinaryDecoder(data, null);
                        datumReader = new SpecificDatumReader<>()
                        //also tried with supplying schema as below commented line
                        //datumReader = new SpecificDatumReader<>(new Schema.Parser().parse("avro schema in string form"));
                        Decoder decoder = DecoderFactory.get().binaryDecoder(data, binaryDecoder);
                        result = datumReader.read(null, decoder);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                return result;
            }
        };
    }
}

The process of serialization is working good, but while performing deserialization throwing Nullpointer, as the serde is not having reference of AVRO schema in targetType during de-serialization.

I will not be having schema details until runtime, hence looking for a generalized schema to be provided at deserialization.

Confluent Serde is initializing the targetType with KafkaAvroDeserializer. How to handle is this case. Any clue?

Edited content
Changed serialization and deserialization to use SpecificRecord. After the code change, it was giving Nullpointer exception as given below

java.lang.NullPointerException
        at org.apache.avro.Schema.applyAliases(Schema.java:1787)
        at org.apache.avro.generic.GenericDatumReader.getResolver(GenericDatumReader.java:130)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
        at com.hcl.test.CustomAvroSerde$2.deserialize(CustomAvroSerde.java:74)
        at com.hcl.test.CustomAvroSerde$2.deserialize(CustomAvroSerde.java:56)
        at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
        at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:63)
        at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
        at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:175)
        at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)

As commented in the code, tried with Schema by passing it to SpecificDatumReader while constructing. When tried that, it was giving Casting exception as below,

java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to org.apache.avro.specific.SpecificRecord
    at com.hcl.test.CustomAvroSerde$2.deserialize(CustomAvroSerde.java:74)
    at com.hcl.test.CustomAvroSerde$2.deserialize(CustomAvroSerde.java:56)
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:63)

Also enabled specific.avro.reader flag to true.
End of Edited content

Thanks
Venkata

Upvotes: 1

Views: 2511

Answers (0)

Related Questions