Reputation: 1
PCollection<KafkaRecord<String, byte[]>> kafkaRecordPCollection =
pipeline.apply(
KafkaIO.<String, byte[]>read()
.withBootstrapServers("bootstrap-server")
.withTopic("topic")
.withConsumerConfigUpdates(props)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(ByteArrayDeserializer.class));
When I create kafkaIO in above format Im able to get data in below mentioned way.
KV{Struct{OC_NO=ABCDE3,PO_NO=11XA435A,S_ID=024,__dbz__physicalTableIdentifier=cdc.po.PO}, [3, 0, -93, -44, 46, 43, 44, -90, 68, 71, -96, 24, -105, 51, 107, -22, -27, 86, 0, 2, 12, 65, 66, 67, 68, 69, 51, 0, 2, 97, 22, 49, 46, 56, 46, 49, 46, 70, 105, 110, 97, 108, 20, 112, 111, 115, 116, 103, 114, 101, 115, 113, 108, 20, 99, 100, 99, 95, 115, 101, 114, 118, 101, 114, -6, -96, -63, -16, -28, 96, 0, 10, 102, 97, 108, 115, 101, 18, 75, 65, 70, 75, 65, 45, 80, 79, 67, 2, 66, 91, 34, 49, 50, 53, 53, 51, 52, 49, 56, 48, 56, 52, 53, 54, 34, 44, 34, 49, 50, 53, 53, 51, 56, 57, 55, 54, 52, 52, 56, 56, 34, 93, 26, 112, 117, 114, 99, 104, 97, 115, 101, 111, 114, 100, 101, 114, 28, 80, 85, 82, 67, 72, 65, 83, 69, 95, 79, 82, 68, 69, 82, 2, -66, -95, -57, -90, 15, 2, -112, -18, -4, -80, -119, 73, 0, 2, 117, 2, -116, -89, -63, -16, -28, 96, 0]}
But I need to deserialize the byte[] into a Pojo class generated from AVRO schema. And when I try to use it for withValueDeserializer() im getting errors. Is there a specific way to do it.
I have created a custom MyClassKafkaAvroDeserializer as well.
@Slf4j
public class MyClassKafkaAvroDeserializer extends
AbstractKafkaAvroDeserializer implements Deserializer<Envelope> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
configure(new KafkaAvroDeserializerConfig(configs));
}
@Override
public Envelope deserialize(String s, byte[] bytes) {
return (Envelope) this.deserialize(bytes);
}
@Override
public void close() {
}
}
PCollection<KafkaRecord<String, Envelope>> kafkaRecordPCollection =
pipeline.apply(
KafkaIO.<String, Envelope>read()
.withBootstrapServers("bootstrap-server")
.withTopic("topic")
.withConsumerConfigUpdates(props)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(MyClassKafkaAvroDeserializer.class)
);
Which gives me an error like below
Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
Upvotes: 0
Views: 400
Reputation: 5104
If you're having trouble with deserializers, you can always follow your byte-producing Read with a Map that does the deserialization.
Upvotes: 0
Reputation: 1443
I think you also need to provide a coder in this case, like:
KafkaIO.<String, Envelope>read()
...
.withValueDeserializerAndCoder(
MyClassKafkaAvroDeserializer.class, AvroCoder.of(Envelope.class))
Upvotes: 1