Yannick Widmer
Yannick Widmer

Reputation: 1363

Error using s3 connector with landoop docker container for kafka

When creating a sink connector with the following configuration

connector.class=io.confluent.connect.s3.S3SinkConnector
s3.region=us-west-2
topics.dir=topics
flush.size=3
schema.compatibility=NONE
topics=my_topic
tasks.max=1
s3.part.size=5242880
format.class=io.confluent.connect.s3.format.avro.AvroFormat
# added after comment 
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
schema.generator.class=io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
storage.class=io.confluent.connect.s3.storage.S3Storage
s3.bucket.name=my-bucket

and running it I get following error

org.apache.kafka.connect.errors.DataException: coyote-test-avro
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:453)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:287)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 91319
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:192)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:218)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:394)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:387)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:65)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:138)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:122)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:194)
    at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:121)
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:84)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:453)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:287)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

But my topic has a schema as I can see using the UI provided by the docker container landoop/fast-data-dev. And even if I try to write raw data to s3 changing following configs

value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
format.class=io.confluent.connect.s3.format.bytearray.ByteArrayFormat
storage.class=io.confluent.connect.s3.storage.S3Storage
schema.compatibility=NONE

and removing schema.generator.class, the same error appears, even-though this should not use an avro schema to my understanding.

To be able to write to s3 I set the environement variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY in my container but it seems anyhow that the problem comes before that point.

I imagine that there might be a problem with the versions, as mentioned above I use the container landoop/fast-data-dev in a docker-machine (it doesn't work in the mac native docker machine) and produce and consumer work perfectly. This is the about section

enter image description here

I looked at the connect logs but I couldn't figure out any helpful information, however if you can tell me what I should look for I will add the relevant lines (all the logs are too big)

Upvotes: 0

Views: 322

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191743

Every single topic message must be encoded as Avro, as specified by the schema registry.

The converter looks at bytes 2-5 of the raw Kafka data (keys and values), converts to an integer (in your case, the ID in the error), and does a lookup into the registry.

If it's not Avro, or otherwise bad data, you get either the error here, or an error about invalid magic byte.

And this error isn't a Connect error. You can reproduce it using the Avro console consumer if you add the print-key property.

Assuming that is the case, one solution is to change the key serde to use byte array deserializer so it skips avro lookups

Otherwise, the only solution here since you cannot delete the messages in Kafka is to figure out why the producers are sending bad data, fix them, then either move the connect consumer group to the latest offset with valid data, wait for the invalid data to expire on the topic, or move to a new topic entirely

Upvotes: 1

Related Questions