Reputation: 141
We are running Kafka Connect (Confluent Platform 5.4, ie. Kafka 2.4) in a distributed mode using Debezium (MongoDB) and Confluent S3 connectors. When adding a new connector via the REST API the connector is created in RUNNING state, but no tasks are created for the connector.
Pausing and resuming the connector does not help. When we stop all workers and then start them again, the tasks are created and everything runs as it should.
The issue is not caused by the connector plugins, because we see the same behaviour for both Debezium and S3 connectors. Also in debug logs I can see that Debezium is correctly returning a task configuration from the Connector.taskConfigs() method.
Can somebody tell me what to do se we can add connectors without restarting the workers? Thanks.
Configuration details
The cluster has 3 nodes with the following connect-distributed.properties:
bootstrap.servers=kafka-broker-001:9092,kafka-broker-002:9092,kafka-broker-003:9092,kafka-broker-004:9092
group.id=tdp-QA-connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.topic=connect-offsets-qa
offset.storage.replication.factor=3
offset.storage.partitions=5
config.storage.topic=connect-configs-qa
config.storage.replication.factor=3
status.storage.topic=connect-status-qa
status.storage.replication.factor=3
status.storage.partitions=3
offset.flush.interval.ms=10000
rest.host.name=tdp-QA-kafka-connect-001
rest.port=10083
rest.advertised.host.name=tdp-QA-kafka-connect-001
rest.advertised.port=10083
plugin.path=/opt/kafka-connect/plugins,/usr/share/java/
security.protocol=SSL
ssl.truststore.location=/etc/kafka/ssl/kafka-connect.truststore.jks
ssl.truststore.password=<secret>
ssl.endpoint.identification.algorithm=
producer.security.protocol=SSL
producer.ssl.truststore.location=/etc/kafka/ssl/kafka-connect.truststore.jks
producer.ssl.truststore.password=<secret>
consumer.security.protocol=SSL
consumer.ssl.truststore.location=/etc/kafka/ssl/kafka-connect.truststore.jks
consumer.ssl.truststore.password=<secret>
max.request.size=20000000
max.partition.fetch.bytes=20000000
The connectors configuration
Debezium example:
{
"name": "qa-mongodb-comp-converter-task|1",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"mongodb.hosts": "mongodb-qa-001:27017,mongodb-qa-002:27017,mongodb-qa-003:27017",
"mongodb.name": "qa-debezium-comp",
"mongodb.ssl.enabled": true,
"collection.whitelist": "converter[.]task",
"tombstones.on.delete": true
}
}
S3 example:
{
"name": "qa-s3-sink-task|1",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"topics": "qa-debezium-comp.converter.task",
"topics.dir": "data/env/qa",
"s3.region": "eu-west-1",
"s3.bucket.name": "<bucket-name>",
"flush.size": "15000",
"rotate.interval.ms": "3600000",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "custom.kafka.connect.s3.format.plaintext.PlaintextFormat",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"schema.compatibility": "NONE",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"value.converter.schemas.enable": false,
"transforms": "ExtractDocument",
"transforms.ExtractDocument.type":"custom.kafka.connect.transforms.ExtractDocument$Value"
}
}
The connectors are created using curl:
curl -X POST -H "Content-Type: application/json" --data @<json_file> http:/<connect_host>:10083/connectors
Upvotes: 8
Views: 7577
Reputation: 141
The problem was caused by the |
character in the names of the connectors. Generally special characters in connector names were not properly url-encoded during a communication between workers in older Kafka Connect versions. More details in https://issues.apache.org/jira/browse/KAFKA-9747
Upvotes: 0
Reputation: 49694
I got empty tasks
when deploying a different connector at Tasks are empty after deploying ElasticsearchSinkConnector
Adding these two to the config
when deploy the connector will help locate the issue about why the task failed.
"errors.log.include.messages": "true",
"errors.log.enable": "true"
In my case, instead of empty tasks
, it will show why it failed:
GET /connectors/elasticsearch-sink/status
{
"name": "elasticsearch-sink",
"connector": {
"state": "RUNNING",
"worker_id": "10.xxx.xxx.xxx:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "10.xxx.xxx.xxx:8083",
"trace": "org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: connect-elasticsearch-sink\n"
}
],
"type": "sink"
}
Upvotes: 0
Reputation: 1090
Delete the connector and create it again with a different database.server.id
. Repeat this process until task(s) show up.
It worked for me after 6-7 trials, not sure why. Pausing and resuming, restarting the connector/tasks did not help me.
Upvotes: 0
Reputation: 121
I had the same problem, so i changed the name of the connector and create a new one, it worked but I don't know the source of this problem because we have no information in kafka-connect logs.
Upvotes: 4