Vova l
Vova l

Reputation: 323

Kafka s3 json connector

I try to put Json to s3 using latest kafka (confluent-platform-2.11) connect. I set format.class=io.confluent.connect.s3.format.json.JsonFormat in quickstart-s3.properties file

and load connector:

~$ confluent load s3-sink  {   "name": "s3-sink",   "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "1",
    "topics": "s3_hose",
    "s3.region": "us-east-1",
    "s3.bucket.name": "some-bucket-name",
    "s3.part.size": "5242880",
    "flush.size": "1",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
    "schema.compatibility": "NONE",
    "name": "s3-sink"   },   "tasks": [
    {
      "connector": "s3-sink",
      "task": 0
    }   ],   "type": null }

Then I sent one row to Kafka:

~$ kafka-console-producer --broker-list localhost:9092 --topic s3_hose

{"q":1}

And I see Avro convertion exception in connector log:

[2018-01-14 14:41:30,832] ERROR WorkerSinkTask{id=s3-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runti me.WorkerTask:172) org.apache.kafka.connect.errors.DataException: s3_hose
        at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:96)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:454)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:287)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1 Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

Why there is attempt to use some Avro converter if I set format.class=io.confluent.connect.s3.format.json.JsonFormat ?

Upvotes: 1

Views: 2509

Answers (1)

dawsaw
dawsaw

Reputation: 2313

This message refers to the converter. This is different from the final output format. It is used to convert the data in Kafka to the connect data API format so connectors have something standard to work with. To set the converter you can either

1) set the key.converter and value.converter to be the built-in JsonConverter in your worker properties file so it becomes the default for all connectors running in the worker

2) set the key.converter and value.converter properties at the connector level to override what is set at the worker level

Note, since this is a sink connector you will pretty much want to match your converter to the type of data in the topic so it can be converted properly.

Upvotes: 3

Related Questions