Mikhail
Mikhail

Reputation: 2929

ElasticsearchSinkConnector Failed to deserialize data to Avro

I created the simplest kafka sink connector config and I'm using confluent 4.1.0:

{
  "connector.class": 
  "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
  "type.name": "test-type",
  "tasks.max": "1",
  "topics": "dialogs",
  "name": "elasticsearch-sink",
  "key.ignore": "true",
  "connection.url": "http://localhost:9200",
  "schema.ignore": "true"
}

and in the topic I save the messages in JSON

{ "topics": "resd"}

But in the result I get an error:

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1 Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

Upvotes: 4

Views: 2429

Answers (3)

peedee
peedee

Reputation: 3739

There is also another scenario where this Unknown magic byte! error will show up, and that is if the messages were written with a schema, but later that schema version was deleted from the schema registry. The Confluent libraries (at least in the version which I'm currently testing, Confluent Platform v7.1.1) will show the same errors because they cannot find a valid schema for the messages.

Upvotes: 1

Robin Moffatt
Robin Moffatt

Reputation: 32090

As cricket_007 says, you need to tell Connect to use Json deserialiser, if that's the format your data is in. Add this to your connector configuration:

"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false"

Upvotes: 5

OneCricketeer
OneCricketeer

Reputation: 191728

That error happens because it's trying to read non Confluent Schema Registry encoded Avro messages.

If the topic data is Avro, it needs to use the Schema Registry.

Otherwise, if topic data is JSON, then you've started the connect cluster with AvroConverter on your keys or values in the property file, where you need to use the JsonConverter instead

Upvotes: 2

Related Questions