Mr T.
Mr T.

Reputation: 4498

Kafka Connect - S3 Sink does not consume from topic

I have a S3 sink connector that reads from a Kafka topic and spills to S3. The connector is not consuming from the topic.

This is the connector configuration:

{
  "name": "my-connector",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "behavior.on.null.values": "ignore",
    "s3.region": "<aws region>",
    "topics.dir": "my-topic",
    "flush.size": "1000",
    "tasks.max": "1",
    "s3.part.size": "5242880",
    "timezone": "UTC",
    "rotate.interval.ms": "30000",
    "locale": "en-US",
    "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
    "aws.access.key.id": "<aws access key>",
    "errors.deadletterqueue.topic.replication.factor": "1",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "s3.bucket.name": "<aws bucket>",
    "partition.duration.ms": "30000",
    "schema.compatibility": "NONE",
    "topics": "my-topic",
    "aws.secret.access.key": "<aws secret key>",
    "task.class": "io.confluent.connect.s3.S3SinkTask",
    "errors.deadletterqueue.topic.name": "dlq-my-topic",
    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "name": "my-connector",
    "errors.tolerance": "all",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
    "rotate.schedule.interval.ms": "60000",
    "timestamp.extractor": "Record"
  }
}

This is the connector status:

curl localhost:8083/connectors/my-connector/status
{
  "name": "my-connector",
  "connector": {
    "state": "RUNNING",
    "worker_id": "localhost:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "localhost:8083"
    }
  ],
  "type": "sink"
}

Information provided about the Kafka consumer group is as follows: (./kafka-consumer-groups.sh --bootstrap-server kafka:9092 --group connect-my-connector --describe)

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
connect-my-connector my-topic 0 1182 12072 10890 connector-consumer-my-connector-0-68793e0d-8312-4d20-b23c-5221ca54b0dc ip connector-consumer-my-connector-0

So it seems that kafka-connect has an active consumer in the consumer group. What could be the reason that the connector is not consuming from Kafka?

Looking at this - the answer doesn't make sense as authentication isn't enabled in our Kafka cluster. What else could cause a connected kafka-connect connector not to consume messages from a topic? No Apparent errors in the logs during connect startup, plugin loading or connector initialization.

Upvotes: 0

Views: 51

Answers (1)

user152468
user152468

Reputation: 3242

Yes, the configuration above did not consume for me either. When changing the value.converter configuration as shown below, it started consuming and writing to S3. The data in my source topic is Avro data.

What's the data format of the data within your topic?

You may also want to set the connect server logging to trace for the package io.confluent.connect.s3 in your log4j.properties file to get more information.

{
  "name": "s3-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "behavior.on.null.values": "ignore",
    "s3.region": "$REGION",
    "topics.dir": "pageviews",
    "flush.size": "10",
    "tasks.max": "1",
    "s3.part.size": "5242880",
    "timezone": "UTC",
    "rotate.interval.ms": "600000",
    "locale": "en-US",
    "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
    "aws.access.key.id": "${AWS_ACCESS_KEY_ID}",
    "errors.deadletterqueue.topic.replication.factor": "1",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://localhost:8081",
    "s3.bucket.name": "s3-sink-test ",
    "partition.duration.ms": "30000",
    "schema.compatibility": "NONE",
    "topics": "pageviews",
    "aws.secret.access.key": "${AWS_SECRET_ACCESS_KEY}",
    "task.class": "io.confluent.connect.s3.S3SinkTask",
    "errors.deadletterqueue.topic.name": "dlq-my-topic",
    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "errors.tolerance": "all",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/'minute'=mm",
    "rotate.schedule.interval.ms": "60000",
    "timestamp.extractor": "Record"
  }
}

Upvotes: 0

Related Questions