Reputation: 1363
When creating a sink connector with the following configuration
connector.class=io.confluent.connect.s3.S3SinkConnector
s3.region=us-west-2
topics.dir=topics
flush.size=3
schema.compatibility=NONE
topics=my_topic
tasks.max=1
s3.part.size=5242880
format.class=io.confluent.connect.s3.format.avro.AvroFormat
# added after comment
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
schema.generator.class=io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
storage.class=io.confluent.connect.s3.storage.S3Storage
s3.bucket.name=my-bucket
and running it I get following error
org.apache.kafka.connect.errors.DataException: coyote-test-avro
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:453)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:287)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 91319
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:192)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:218)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:394)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:387)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:65)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:138)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:122)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:194)
at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:121)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:84)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:453)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:287)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
But my topic has a schema as I can see using the UI provided by the docker container landoop/fast-data-dev
. And even if I try to write raw data to s3 changing following configs
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
format.class=io.confluent.connect.s3.format.bytearray.ByteArrayFormat
storage.class=io.confluent.connect.s3.storage.S3Storage
schema.compatibility=NONE
and removing schema.generator.class
, the same error appears, even-though this should not use an avro schema to my understanding.
To be able to write to s3 I set the environement variables AWS_ACCESS_KEY_ID
and AWS_SECRET_ACCESS_KEY
in my container but it seems anyhow that the problem comes before that point.
I imagine that there might be a problem with the versions, as mentioned above I use the container landoop/fast-data-dev
in a docker-machine (it doesn't work in the mac native docker machine) and produce and consumer work perfectly.
This is the about section
I looked at the connect logs but I couldn't figure out any helpful information, however if you can tell me what I should look for I will add the relevant lines (all the logs are too big)
Upvotes: 0
Views: 322
Reputation: 191743
Every single topic message must be encoded as Avro, as specified by the schema registry.
The converter looks at bytes 2-5 of the raw Kafka data (keys and values), converts to an integer (in your case, the ID in the error), and does a lookup into the registry.
If it's not Avro, or otherwise bad data, you get either the error here, or an error about invalid magic byte
.
And this error isn't a Connect error. You can reproduce it using the Avro console consumer if you add the print-key
property.
Assuming that is the case, one solution is to change the key serde to use byte array deserializer so it skips avro lookups
Otherwise, the only solution here since you cannot delete the messages in Kafka is to figure out why the producers are sending bad data, fix them, then either move the connect consumer group to the latest offset with valid data, wait for the invalid data to expire on the topic, or move to a new topic entirely
Upvotes: 1