dubpp
dubpp

Reputation: 95

Implementing custom AvroConverter for confluent kafka-connect-s3

I am using Confluent's Kafka s3 connect for copying data from apache Kafka to AWS S3.

The problem is that I have Kafka data in AVRO format which is NOT using Confluent Schema Registry’s Avro serializer and I cannot change the Kafka producer. So I need to deserialize existing Avro data from Kafka and then persist the same in parquet format in AWS S3. I tried using confluent's AvroConverter as value converter like this -

value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost/api/v1/avro

And i am getting this error -

Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic dcp-all to Avro: 
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:110)
    at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:86)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:488)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

As far as I understand, "io.confluent.connect.avro.AvroConverter" will only work if the data is written in Kafka using Confluent Schema Registry’s Avro serializer and hence I am getting this error. So my question is Do I need to implement a generic AvroConverter in this case? And if yes, how do I extend the existing source code - https://github.com/confluentinc/kafka-connect-storage-cloud?

Any help here will be appreciated.

Upvotes: 1

Views: 3877

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191743

You don't need to extend that repo. You just need to implement a Converter (part of Apache Kafka) shade it into a JAR, then place it on your Connect worker's CLASSPATH, like BlueApron did for Protobuf

Or see if this works - https://github.com/farmdawgnation/registryless-avro-converter


NOT using Confluent Schema Registry

Then what registry are you using? Each one that I know of has configurations to interface with the Confluent one

Upvotes: 1

Related Questions