Reputation: 341
I have checked the 2 similar issues but it doesn't help.
node 1 [/appl/node1/confluent-4.0.0] ./bin/confluent status elasticsearch-sink
{"name":"elasticsearch-sink","connector":{"state":"RUNNING","worker_id":"10.192.226.24:8083"},"tasks":[{"state":"FAILED","trace":"org.apache.kafka.connect.errors.DataException:
emailfilters\n\tat
io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:96)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:453)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:287)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)\n\tat
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)\n\tat
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)\n\tat
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat
java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
java.lang.Thread.run(Thread.java:748)\nCaused by:
org.apache.kafka.common.errors.SerializationException: Error
retrieving Avro schema for id 21\nCaused by:
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:
Subject not found.; error code: 40401\n\tat
io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:191)\n\tat
io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:218)\n\tat
io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:284)\n\tat
io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:272)\n\tat
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersionFromRegistry(CachedSchemaRegistryClient.java:71)\n\tat
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersion(CachedSchemaRegistryClient.java:182)\n\tat
io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:152)\n\tat
io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:194)\n\tat
io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:121)\n\tat
io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:84)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:453)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:287)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)\n\tat
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)\n\tat
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)\n\tat
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat
java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
java.lang.Thread.run(Thread.java:748)\n","id":0,"worker_id":"10.192.226.24:8083"}],"type":"sink"}
my properties :
name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=emailfilters
key.ignore=true
connection.url=http://127.0.0.1:9197
type.name=kafka-connect
tried adding the following but still get the same error.
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://node1:9193
My topic is being populated from KSQL stream.
Upvotes: 1
Views: 14190
Reputation: 4255
kafka connector configs should contains the below config field as well
"value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.RecordNameStrategy"
There are few other avro subject identification strategies more
Since the topic data can contain multiple avro schema types we need to provide a way to identify the right data type according to the specific record
Upvotes: 0
Reputation: 191894
I have noticed if using key.converter
to Avro, then it will still try to extract a schema from string, ints, etc. non-avro datatypes, then give these confusing errors. The only validation it checks if it the byte payload starts with a 0
, then just gets the next 4 bytes, which doesn't really say that it definitely is not Avro.
Check the datatype for your topic's key, then check your Connect properties
Upvotes: 0
Reputation: 32110
This is the cause of the failure:
Caused by:
org.apache.kafka.common.errors.SerializationException: Error
retrieving Avro schema for id 21\nCaused by:
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:
Subject not found.; error code: 40401
This means that the Schema Registry you've defined doesn't have the schema for the data on the topic. Are you specifying the same Schema Registry in your Connector config as you are using with KSQL?
Upvotes: 1