nilian
nilian

Reputation: 19

Can't deserialize protobuf data using Kafka-Connect with Schema-registry

I am trying to sink a Kafka topic (with proto data!) into a Postgres table using Kafka-Connect and Schema-registry!

My Kafka-Connect connector file :

{
   "name":"proto_sink",
         "config":{
            "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
            "connection.password":"some_password",
            "topics":"some_topic",
            "value.converter.schema.registry.url":"http://localhost:8081",
            "key.converter.schemas.enable":"false",
            "auto.evolve":"true",
            "connection.user":"some_user",
            "value.converter.schemas.enable":"true",
            "name":"sink_proto",
            "auto.create":"true",
            "connection.url":"jdbc:postgresql://localhost:5432/some_db",
            "value.converter":"io.confluent.connect.protobuf.ProtobufConverter",
            "insert.mode":"insert",
            "key.converter":"org.apache.kafka.connect.storage.StringConverter"
  }
}

I first use this command to send proto schema to Schema-registry.

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json"   --data '{"schema": "syntax = \"proto3\";\npackage nilian;\n\nmessage Person {\n  string name = 1;\n  int32 age = 2;\n}"}'   http://localhost:8081/subjects/some_topic-value/versions

And then I try this command to send connector configs to Kafka-Connect with this command:

curl -X POST -H "Content-Type: application/json" -d @connector_configs/json_conn.json http://localhost:8083/connectors

So I get this Error:

Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic some_topic to Protobuf: 
        at io.confluent.connect.protobuf.ProtobufConverter.toConnectData(ProtobufConverter.java:154)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$5(WorkerSinkTask.java:528)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:190)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:224)
        ... 14 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Protobuf message for id -1
        at io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufDeserializer.deserialize(AbstractKafkaProtobufDeserializer.java:228)
        at io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaProtobufDeserializer.java:292)
        at io.confluent.connect.protobuf.ProtobufConverter$Deserializer.deserialize(ProtobufConverter.java:200)
        at io.confluent.connect.protobuf.ProtobufConverter.toConnectData(ProtobufConverter.java:132)
        ... 17 more
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
        at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:600)
        at io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufDeserializer.deserialize(AbstractKafkaProtobufDeserializer.java:133)
        ... 20 more

what is wrong with this?

Upvotes: 0

Views: 177

Answers (2)

c burke
c burke

Reputation: 5

Yes, it looks like there data coming from the topic, into connector, is not serialized (i.e., it has no magic byte). Use a free GUI tool like KafkIO to pull from the topic yourself and see how it works, to get a sense what the connector is seeing, it might give you a clue. You can choose to use a Schema Registry or not, etc.

Upvotes: 0

Kriil
Kriil

Reputation: 578

You are missing confluent's magic byte. It has to be the first byte in the message or you'll get this error.

https://www.confluent.io/blog/how-to-fix-unknown-magic-byte-errors-in-apache-kafka/

Upvotes: 0

Related Questions