Reputation: 189
I am trying to use S3 sink connector inside kafka connect , It starts and fails later .
My config looks like :
{
"name": "my-s3-sink3",
"config": {
"connector.class":"io.confluent.connect.s3.S3SinkConnector",
"tasks.max":"1",
"topics":"mysource.topic",
"s3.region":"us-east-1",
"s3.bucket.name": "topicbucket001",
"s3.part.size":"5242880",
"flush.size":"1",
"storage.class":"io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class":"io.confluent.connect.storage.partitioner.DefaultPartitioner",
"schema.compatibility":"NONE"
}
}
My connect-distributed.properties look like:
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
errors.tolerance = all
Complete Error log :
[2021-04-06 10:59:04,398] INFO [Consumer clientId=connector-consumer-s3connect12-0, groupId=connect-s3connect12] Member connector-consumer-s3connect12-0-f1e48df8-76ba-49f9-9080-e10b0a34202b sending LeaveGroup request to coordinator **********.kafka.us-east-1.amazonaws.com:9092 (id: 2147483645 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
2021-04-06 16:29:04
[2021-04-06 10:59:04,397] ERROR WorkerSinkTask{id=s3connect12-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
2021-04-06 16:29:04
[2021-04-06 10:59:04,396] ERROR WorkerSinkTask{id=s3connect12-0} Error converting message key in topic 'quickstart-status' partition 3 at offset 0 and timestamp 1617706740956: Converting byte[] to Kafka Connect data failed due to serialization error: (org.apache.kafka.connect.runtime.WorkerSinkTask)
2021-04-06 16:29:04
[2021-04-06 10:59:04,393] INFO [Consumer clientId=connector-consumer-s3connect12-0, groupId=connect-s3connect12] Resetting offset for partition quickstart-status-3 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[***************.kafka.us-east-1.amazonaws.com:9092 (id: 1 rack: use1-az2)], epoch=absent}}. (org.apache.kafka.clients.consumer.internals.SubscriptionState)
Message type :
{
"registertime": 1511985752912,
"userid": "User_6",
"regionid": "Region_8",
"gender": "FEMALE"
}
Upvotes: 0
Views: 1661
Reputation: 189
So I am able to resolve the issue. After specifying the converters explicitly, I was able to resolve the deserialization error and then had an issue with S3 Multipart Upload which was resolved by giving Fargate task permission to the S3 bucket by attaching S3 IAM Policy to the ECS Task definition. Thanks, Robin Moffatt for the solution above!
Upvotes: 0
Reputation: 32090
The problem is the Key SerDe. Per your screenshot the key data is a non-JSON string:
User_2
User_9
etc
So instead of
key.converter=org.apache.kafka.connect.json.JsonConverter
use
key.converter=org.apache.kafka.connect.storage.StringConverter
Edit:
Try this for your connector config, specifying the converters explicitly (as suggested by @OneCricketeer)
{
"name": "my-s3-sink3",
"config": {
"connector.class" : "io.confluent.connect.s3.S3SinkConnector",
"tasks.max" : "1",
"topics" : "mysource.topic",
"s3.region" : "us-east-1",
"s3.bucket.name" : "topicbucket001",
"s3.part.size" : "5242880",
"flush.size" : "1",
"key.converter" : "org.apache.kafka.connect.storage.StringConverter",
"value.converter" : "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"storage.class" : "io.confluent.connect.s3.storage.S3Storage",
"format.class" : "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class" : "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"schema.compatibility" : "NONE"
}
}
Upvotes: 3