Reputation: 95
I am using Confluent's Kafka s3 connect for copying data from apache Kafka to AWS S3.
The problem is that I have Kafka data in AVRO format which is NOT using Confluent Schema Registry’s Avro serializer and I cannot change the Kafka producer. So I need to deserialize existing Avro data from Kafka and then persist the same in parquet format in AWS S3. I tried using confluent's AvroConverter as value converter like this -
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost/api/v1/avro
And i am getting this error -
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic dcp-all to Avro:
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:110)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:86)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:488)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
As far as I understand, "io.confluent.connect.avro.AvroConverter" will only work if the data is written in Kafka using Confluent Schema Registry’s Avro serializer and hence I am getting this error. So my question is Do I need to implement a generic AvroConverter in this case? And if yes, how do I extend the existing source code - https://github.com/confluentinc/kafka-connect-storage-cloud?
Any help here will be appreciated.
Upvotes: 1
Views: 3877
Reputation: 191743
You don't need to extend that repo. You just need to implement a Converter
(part of Apache Kafka) shade it into a JAR, then place it on your Connect worker's CLASSPATH
, like BlueApron did for Protobuf
Or see if this works - https://github.com/farmdawgnation/registryless-avro-converter
NOT using Confluent Schema Registry
Then what registry are you using? Each one that I know of has configurations to interface with the Confluent one
Upvotes: 1