Reputation: 55
Avro schema :
{
"type": "record",
"name": "Envelope",
"namespace": "test",
"fields": [
{
"name": "before",
"type": [
"null",
{
"type": "record",
"name": "Value",
"fields": [
{
"name": "id",
"type": {
"type": "long",
"connect.default": 0
},
"default": 0
},
{
"name": "createdAt",
"type": [
"null",
{
"type": "string",
"connect.version": 1,
"connect.name": "io.debezium.time.ZonedTimestamp"
}
],
"default": null
},
],
"connect.name": "test.Value"
}
],
"default": null
},
{
"name": "after",
"type": [
"null",
"Value"
],
"default": null
}
],
"connect.name": "test.Envelope"
}
S3-sink connector configuration:
{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"behavior.on.null.values": "ignore",
"s3.region": "us-west-2",
"flush.size": "1",
"tasks.max": "3",
"timezone": "UTC",
"locale": "US",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"aws.access.key.id": "---",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"s3.bucket.name": "test-s3-sink-created-at-partition",
"partition.duration.ms": "1000",
"topics.regex": "test_topic",
"aws.secret.access.key": "---",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"value.converter.schemas.enable": "false",
"name": "s3-sink-created-at-partition",
"errors.tolerance": "all",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"path.format": "YYYY/MM/dd",
"timestamp.extractor": "RecordField",
"timestamp.field": "createdAt"
}
Error:
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:591)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:239)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: The field 'createdAt' does not exist in
Caused by: org.apache.kafka.connect.errors.DataException: Unable to find nested field 'createdAt'
Problem facing:
Currently, I am trying to get data from test-topic using above sink connector using createAt field in s3 bucket but it continuously throwing error for createdAt field. And s3-bucket is not created using above configuration. Please provide your suggestions on this.
Upvotes: 2
Views: 455
Reputation: 191738
You should be able to use after.Value.createdAt
- See my PR
But the better option is to unwrap the envelope, like you're asking.
s3-bucket is not created using above configuration
You need to create the bucket ahead of time.
Upvotes: 2