Narendra Makwana
Narendra Makwana

Reputation: 1

AWS MSK Kafka Connect S3 Sink Connector

We are using S3 Sink connector to sink data in S3 bucket from our aws MSK.

we have deployed Kafka S3 Sink connector on AWS EKS(Kubernetes)

When we are starting the connector getting below errors when multipart upload is happening on S3 bucket.

We have policy restriction on S3 bucket as server side encryption(AWS-KMS) enabled i.e. we can not upload without having KMS key.

below configuration we are using for connector and below error details for your ref.

Kindly help

{
    "name": "kc-s3-nuoAccountIDs-sink",
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "topics": "test-topic",
    "tasks.max": "1",
    "s3.bucket.name": "s3-bucket-name",
    "value.converter.schemas.enable": "false",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
    "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
    "behavior.on.null.values": "ignore",
    "schema.compatibility": "NONE",
    "partition.duration.ms": 3600000,
    "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
    "locale": "en-US",
    "timezone": "UTC",
    "partition.field.name": "accountplatform",
    "s3.region": "eu-west-2",
    "flush.size": 100000
}


kTask.execute(WorkerSinkTask.java:201)
  at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
  at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
  at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
  at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.ConnectException: Unable to initiate MultipartUpload
  at io.confluent.connect.s3.storage.S3OutputStream.newMultipartUpload(S3OutputStream.java:230)
  at io.confluent.connect.s3.storage.S3OutputStream.uploadPart(S3OutputStream.java:139)
  at io.confluent.connect.s3.storage.S3OutputStream.commit(S3OutputStream.java:165)
  at io.confluent.connect.s3.format.avro.AvroRecordWriterProvider$1.commit(AvroRecordWriterProvider.java:102)
  at io.confluent.connect.s3.format.KeyValueHeaderRecordWriterProvider$1.commit(KeyValueHeaderRecordWriterProvider.java:126)
at 

Upvotes: 0

Views: 1104

Answers (1)

Narendra Makwana
Narendra Makwana

Reputation: 1

In our case we need to pass the KMS key settings for S3 Connector.

Based up on the official documentation, along with the above mentioned settings, we have used the below two additional settings in S3 connector configuration:

"s3.sse.kms.key.id": "<kms-key-id-here>",
"s3.ssea.name": "aws:kms"

We are able to get data in our s3 bucket now.

Upvotes: 0

Related Questions