KanikaM
KanikaM

Reputation: 189

Unable to use sink connector inside kafka connect

I am trying to use S3 sink connector inside kafka connect , It starts and fails later .

My config looks like :

{
    "name": "my-s3-sink3",
     "config": {
         "connector.class":"io.confluent.connect.s3.S3SinkConnector", 
         "tasks.max":"1", 
         "topics":"mysource.topic", 
         "s3.region":"us-east-1", 
         "s3.bucket.name": "topicbucket001", 
         "s3.part.size":"5242880", 
         "flush.size":"1", 
         "storage.class":"io.confluent.connect.s3.storage.S3Storage", 
         "format.class": "io.confluent.connect.s3.format.json.JsonFormat", 
         "partitioner.class":"io.confluent.connect.storage.partitioner.DefaultPartitioner", 
         "schema.compatibility":"NONE"
        }
    }

My connect-distributed.properties look like:

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=false
value.converter.schemas.enable=false
errors.tolerance = all

Complete Error log :

[2021-04-06 10:59:04,398] INFO [Consumer clientId=connector-consumer-s3connect12-0, groupId=connect-s3connect12] Member connector-consumer-s3connect12-0-f1e48df8-76ba-49f9-9080-e10b0a34202b sending LeaveGroup request to coordinator **********.kafka.us-east-1.amazonaws.com:9092 (id: 2147483645 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)

2021-04-06 16:29:04
[2021-04-06 10:59:04,397] ERROR WorkerSinkTask{id=s3connect12-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)

2021-04-06 16:29:04
[2021-04-06 10:59:04,396] ERROR WorkerSinkTask{id=s3connect12-0} Error converting message key in topic 'quickstart-status' partition 3 at offset 0 and timestamp 1617706740956: Converting byte[] to Kafka Connect data failed due to serialization error: (org.apache.kafka.connect.runtime.WorkerSinkTask)

2021-04-06 16:29:04
[2021-04-06 10:59:04,393] INFO [Consumer clientId=connector-consumer-s3connect12-0, groupId=connect-s3connect12] Resetting offset for partition quickstart-status-3 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[***************.kafka.us-east-1.amazonaws.com:9092 (id: 1 rack: use1-az2)], epoch=absent}}. (org.apache.kafka.clients.consumer.internals.SubscriptionState)

Message type :

{
   "registertime": 1511985752912,
   "userid": "User_6",
   "regionid": "Region_8",
   "gender": "FEMALE"
}

enter image description here

New ERROR Log : enter image description here

Upvotes: 0

Views: 1661

Answers (2)

KanikaM
KanikaM

Reputation: 189

So I am able to resolve the issue. After specifying the converters explicitly, I was able to resolve the deserialization error and then had an issue with S3 Multipart Upload which was resolved by giving Fargate task permission to the S3 bucket by attaching S3 IAM Policy to the ECS Task definition. Thanks, Robin Moffatt for the solution above!

Upvotes: 0

Robin Moffatt
Robin Moffatt

Reputation: 32090

The problem is the Key SerDe. Per your screenshot the key data is a non-JSON string:

User_2
User_9
etc

So instead of

key.converter=org.apache.kafka.connect.json.JsonConverter

use

key.converter=org.apache.kafka.connect.storage.StringConverter

Edit:

Try this for your connector config, specifying the converters explicitly (as suggested by @OneCricketeer)

{
    "name": "my-s3-sink3",
     "config": {
         "connector.class"               : "io.confluent.connect.s3.S3SinkConnector",
         "tasks.max"                     : "1",
         "topics"                        : "mysource.topic",
         "s3.region"                     : "us-east-1",
         "s3.bucket.name"                : "topicbucket001",
         "s3.part.size"                  : "5242880",
         "flush.size"                    : "1",
         "key.converter"                 : "org.apache.kafka.connect.storage.StringConverter",
         "value.converter"               : "org.apache.kafka.connect.json.JsonConverter",
         "value.converter.schemas.enable": "false",
         "storage.class"                 : "io.confluent.connect.s3.storage.S3Storage",
         "format.class"                  : "io.confluent.connect.s3.format.json.JsonFormat",
         "partitioner.class"             : "io.confluent.connect.storage.partitioner.DefaultPartitioner",
         "schema.compatibility"          : "NONE"
        }
    }

Upvotes: 3

Related Questions