Reputation: 523
I have the following dataframe schema
root
|-- sentence: string (nullable = true)
|-- category: string (nullable = true)
I am writing the above dataframe successfully into a kafka topic(s3.topic). Next I want to read s3.topic from kafka and store the data in s3 bucket. For this I am using kafka connector. I have made the following config changes in the connect-distributed.properties file
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
Following is the curl command I use to write into s3 bucket
curl -X POST \
localhost:8084/connectors \
-H 'Content-Type: application/json' \
-H 'Accept: application/json' \
-d '{
"name": "S3SinkConnector",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"s3.region": "<region>",
"flush.size": "1000",
"topics": "s3.topic",
"tasks.max": "1",
"aws.secret.access.key": "<key>",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"aws.access.key.id": "<keyId>",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.bucket.name": "<s3_bucket_name>",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}'
And I am getting the following error
ERROR WorkerSinkTask{id=S3SinkConnector904-0} Error converting message value in topic 'emoji.analysis009' partition 0 at offset 0 and timestamp 1617585788233: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration. (org.apache.kafka.connect.runtime.WorkerSinkTask:547) org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration. at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:370) at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:545) at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:501) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:501) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:478) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
How can i write my topic data into s3 bucket with the printschema shown above?
UPDATE
Following is the printSchema() output of the dataframe written into kafka topic
root
|-- key: string (nullable = true)
|-- value: string (nullable = true)
Upvotes: 1
Views: 708