Reputation: 481
I'm trying to use S3SinkConnector with the following settings:
{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"flush.size": 1,
"s3.bucket.name": "*****",
"s3.object.tagging": "true",
"s3.region": "us-east-2",
"aws.access.key.id": "*****",
"aws.secret.access.key": "*****",
"s3.part.retries": 5,
"s3.retry.backoff.ms": 1000,
"behavior.on.null.values": "ignore",
"keys.format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"headers.format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"store.kafka.headers": "true",
"store.kafka.keys": "true",
"topics": "***",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"topics.dir": "kafka-backup",
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner",
"locale": "en-US",
"timezone": "UTC",
"timestamp.extractor": "Record"
}
The records in Kafka are store in JSON format and saved there via io.confluent.connect.json.JsonSchemaConverter. So all messages has strict schema.
When sink connector trying to read records from Kafka - I got exception - "Avro schema must be a record." I didn't get why I got this error, cause I don't use any avro format.
The full stack trace:
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:631)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalArgumentException: Avro schema must be a record.
at org.apache.parquet.avro.AvroSchemaConverter.convert(AvroSchemaConverter.java:124)
at org.apache.parquet.avro.AvroParquetWriter.writeSupport(AvroParquetWriter.java:150)
at org.apache.parquet.avro.AvroParquetWriter.access$200(AvroParquetWriter.java:36)
at org.apache.parquet.avro.AvroParquetWriter$Builder.getWriteSupport(AvroParquetWriter.java:182)
at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:563)
at io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider$1.write(ParquetRecordWriterProvider.java:102)
at io.confluent.connect.s3.format.S3RetriableRecordWriter.write(S3RetriableRecordWriter.java:46)
at io.confluent.connect.s3.format.KeyValueHeaderRecordWriterProvider$1.write(KeyValueHeaderRecordWriterProvider.java:107)
at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:562)
at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:311)
at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:254)
at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:205)
at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
Version of S3connector - 10.3.0 Version of kafka-connect - 7.0.1
Upvotes: 2
Views: 1153
Reputation: 191738
You need to not use ParquetFormat, or you need to produce Avro. ParquetFormat requires Avro (source of S3 sink).
Upvotes: 0