Reputation: 323
I try to put Json to s3 using latest kafka (confluent-platform-2.11) connect. I set format.class=io.confluent.connect.s3.format.json.JsonFormat in quickstart-s3.properties file
and load connector:
~$ confluent load s3-sink { "name": "s3-sink", "config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "1",
"topics": "s3_hose",
"s3.region": "us-east-1",
"s3.bucket.name": "some-bucket-name",
"s3.part.size": "5242880",
"flush.size": "1",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"schema.compatibility": "NONE",
"name": "s3-sink" }, "tasks": [
{
"connector": "s3-sink",
"task": 0
} ], "type": null }
Then I sent one row to Kafka:
~$ kafka-console-producer --broker-list localhost:9092 --topic s3_hose
{"q":1}
And I see Avro convertion exception in connector log:
[2018-01-14 14:41:30,832] ERROR WorkerSinkTask{id=s3-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runti me.WorkerTask:172) org.apache.kafka.connect.errors.DataException: s3_hose
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:96)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:454)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:287)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1 Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
Why there is attempt to use some Avro converter if I set format.class=io.confluent.connect.s3.format.json.JsonFormat ?
Upvotes: 1
Views: 2509
Reputation: 2313
This message refers to the converter. This is different from the final output format. It is used to convert the data in Kafka to the connect data API format so connectors have something standard to work with. To set the converter you can either
1) set the key.converter and value.converter to be the built-in JsonConverter in your worker properties file so it becomes the default for all connectors running in the worker
2) set the key.converter and value.converter properties at the connector level to override what is set at the worker level
Note, since this is a sink connector you will pretty much want to match your converter to the type of data in the topic so it can be converted properly.
Upvotes: 3