Reputation: 19
I am trying to sink a Kafka topic (with proto data!) into a Postgres table using Kafka-Connect and Schema-registry!
My Kafka-Connect connector file :
{ "name":"proto_sink", "config":{ "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector", "connection.password":"some_password", "topics":"some_topic", "value.converter.schema.registry.url":"http://localhost:8081", "key.converter.schemas.enable":"false", "auto.evolve":"true", "connection.user":"some_user", "value.converter.schemas.enable":"true", "name":"sink_proto", "auto.create":"true", "connection.url":"jdbc:postgresql://localhost:5432/some_db", "value.converter":"io.confluent.connect.protobuf.ProtobufConverter", "insert.mode":"insert", "key.converter":"org.apache.kafka.connect.storage.StringConverter" } }
I first use this command to send proto schema to Schema-registry.
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "syntax = \"proto3\";\npackage nilian;\n\nmessage Person {\n string name = 1;\n int32 age = 2;\n}"}' http://localhost:8081/subjects/some_topic-value/versions
And then I try this command to send connector configs to Kafka-Connect with this command:
curl -X POST -H "Content-Type: application/json" -d @connector_configs/json_conn.json http://localhost:8083/connectors
So I get this Error:
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic some_topic to Protobuf:
at io.confluent.connect.protobuf.ProtobufConverter.toConnectData(ProtobufConverter.java:154)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$5(WorkerSinkTask.java:528)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:190)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:224)
... 14 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Protobuf message for id -1
at io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufDeserializer.deserialize(AbstractKafkaProtobufDeserializer.java:228)
at io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaProtobufDeserializer.java:292)
at io.confluent.connect.protobuf.ProtobufConverter$Deserializer.deserialize(ProtobufConverter.java:200)
at io.confluent.connect.protobuf.ProtobufConverter.toConnectData(ProtobufConverter.java:132)
... 17 more
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:600)
at io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufDeserializer.deserialize(AbstractKafkaProtobufDeserializer.java:133)
... 20 more
what is wrong with this?
Upvotes: 0
Views: 177
Reputation: 5
Yes, it looks like there data coming from the topic, into connector, is not serialized (i.e., it has no magic byte). Use a free GUI tool like KafkIO to pull from the topic yourself and see how it works, to get a sense what the connector is seeing, it might give you a clue. You can choose to use a Schema Registry or not, etc.
Upvotes: 0
Reputation: 578
You are missing confluent's magic byte. It has to be the first byte in the message or you'll get this error.
https://www.confluent.io/blog/how-to-fix-unknown-magic-byte-errors-in-apache-kafka/
Upvotes: 0