Zamir Arif
Zamir Arif

Reputation: 341

Confluent 4.0.0 Kafka Connect - Schema Registry Subject Not Found : org.apache.kafka.connect.errors.DataException:

I have checked the 2 similar issues but it doesn't help.

node 1 [/appl/node1/confluent-4.0.0] ./bin/confluent status elasticsearch-sink


{"name":"elasticsearch-sink","connector":{"state":"RUNNING","worker_id":"10.192.226.24:8083"},"tasks":[{"state":"FAILED","trace":"org.apache.kafka.connect.errors.DataException:
emailfilters\n\tat
io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:96)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:453)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:287)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)\n\tat
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)\n\tat
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)\n\tat
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat
java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
java.lang.Thread.run(Thread.java:748)\nCaused by:
org.apache.kafka.common.errors.SerializationException: Error
retrieving Avro schema for id 21\nCaused by:
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:
Subject not found.; error code: 40401\n\tat
io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:191)\n\tat
io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:218)\n\tat
io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:284)\n\tat
io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:272)\n\tat
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersionFromRegistry(CachedSchemaRegistryClient.java:71)\n\tat
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersion(CachedSchemaRegistryClient.java:182)\n\tat
io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:152)\n\tat
io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:194)\n\tat
io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:121)\n\tat
io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:84)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:453)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:287)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)\n\tat
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)\n\tat
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)\n\tat
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat
java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
java.lang.Thread.run(Thread.java:748)\n","id":0,"worker_id":"10.192.226.24:8083"}],"type":"sink"}

my properties :

name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=emailfilters
key.ignore=true
connection.url=http://127.0.0.1:9197
type.name=kafka-connect

tried adding the following but still get the same error.

value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://node1:9193

My topic is being populated from KSQL stream.

Upvotes: 1

Views: 14190

Answers (3)

noelyahan
noelyahan

Reputation: 4255

kafka connector configs should contains the below config field as well

"value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.RecordNameStrategy"

There are few other avro subject identification strategies more

Since the topic data can contain multiple avro schema types we need to provide a way to identify the right data type according to the specific record

Upvotes: 0

OneCricketeer
OneCricketeer

Reputation: 191894

I have noticed if using key.converter to Avro, then it will still try to extract a schema from string, ints, etc. non-avro datatypes, then give these confusing errors. The only validation it checks if it the byte payload starts with a 0, then just gets the next 4 bytes, which doesn't really say that it definitely is not Avro.

Check the datatype for your topic's key, then check your Connect properties

Upvotes: 0

Robin Moffatt
Robin Moffatt

Reputation: 32110

This is the cause of the failure:

Caused by:
org.apache.kafka.common.errors.SerializationException: Error
retrieving Avro schema for id 21\nCaused by:
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:
Subject not found.; error code: 40401

This means that the Schema Registry you've defined doesn't have the schema for the data on the topic. Are you specifying the same Schema Registry in your Connector config as you are using with KSQL?

Upvotes: 1

Related Questions