Heshan Karunaratne
Heshan Karunaratne

Reputation: 1

How to consume Avro Serialized messages from AWS MSK via Apache Beam

PCollection<KafkaRecord<String, byte[]>> kafkaRecordPCollection =
        pipeline.apply(
            KafkaIO.<String, byte[]>read()
                .withBootstrapServers("bootstrap-server")
                .withTopic("topic")
                .withConsumerConfigUpdates(props)
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializer(ByteArrayDeserializer.class));

When I create kafkaIO in above format Im able to get data in below mentioned way.

KV{Struct{OC_NO=ABCDE3,PO_NO=11XA435A,S_ID=024,__dbz__physicalTableIdentifier=cdc.po.PO}, [3, 0, -93, -44, 46, 43, 44, -90, 68, 71, -96, 24, -105, 51, 107, -22, -27, 86, 0, 2, 12, 65, 66, 67, 68, 69, 51, 0, 2, 97, 22, 49, 46, 56, 46, 49, 46, 70, 105, 110, 97, 108, 20, 112, 111, 115, 116, 103, 114, 101, 115, 113, 108, 20, 99, 100, 99, 95, 115, 101, 114, 118, 101, 114, -6, -96, -63, -16, -28, 96, 0, 10, 102, 97, 108, 115, 101, 18, 75, 65, 70, 75, 65, 45, 80, 79, 67, 2, 66, 91, 34, 49, 50, 53, 53, 51, 52, 49, 56, 48, 56, 52, 53, 54, 34, 44, 34, 49, 50, 53, 53, 51, 56, 57, 55, 54, 52, 52, 56, 56, 34, 93, 26, 112, 117, 114, 99, 104, 97, 115, 101, 111, 114, 100, 101, 114, 28, 80, 85, 82, 67, 72, 65, 83, 69, 95, 79, 82, 68, 69, 82, 2, -66, -95, -57, -90, 15, 2, -112, -18, -4, -80, -119, 73, 0, 2, 117, 2, -116, -89, -63, -16, -28, 96, 0]}

But I need to deserialize the byte[] into a Pojo class generated from AVRO schema. And when I try to use it for withValueDeserializer() im getting errors. Is there a specific way to do it.

I have created a custom MyClassKafkaAvroDeserializer as well.

@Slf4j
public class MyClassKafkaAvroDeserializer extends
    AbstractKafkaAvroDeserializer implements Deserializer<Envelope> {

  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
    configure(new KafkaAvroDeserializerConfig(configs));
  }

  @Override
  public Envelope deserialize(String s, byte[] bytes) {
    return (Envelope) this.deserialize(bytes);
  }

  @Override
  public void close() {
  }
}

PCollection<KafkaRecord<String, Envelope>> kafkaRecordPCollection =
        pipeline.apply(
            KafkaIO.<String, Envelope>read()
                .withBootstrapServers("bootstrap-server")
                .withTopic("topic")
                .withConsumerConfigUpdates(props)
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializer(MyClassKafkaAvroDeserializer.class)
        );

Which gives me an error like below

Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

Upvotes: 0

Views: 400

Answers (2)

robertwb
robertwb

Reputation: 5104

If you're having trouble with deserializers, you can always follow your byte-producing Read with a Map that does the deserialization.

Upvotes: 0

Alexey Romanenko
Alexey Romanenko

Reputation: 1443

I think you also need to provide a coder in this case, like:

KafkaIO.<String, Envelope>read()
    ...
    .withValueDeserializerAndCoder(
            MyClassKafkaAvroDeserializer.class, AvroCoder.of(Envelope.class))

Upvotes: 1

Related Questions