statanly
statanly

Reputation: 85

Kafka-connect add more topics on the fly

I have an elasticsearch kafka-connect connector consuming some topics.
With the following configuration:

{
    connection.url": "https://my-es-cluster:443",
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.ignore": "true",
    "topics": "topic1,topic2",
    ...
}

Can I add more topics to it while it's running?
What will happen?
What if I remove some topics from list and add them again later.

I'd like to add a new topic3 here:

{
    ...
    "topics": "topic1,topic2,topic3",
    ...
}

What is I remove topic2? Will other topics be re-consumed?:

{
    ...
    "topics": "topic1,topic3",
    ...
}

Upvotes: 2

Views: 703

Answers (1)

Viacheslav Shalamov
Viacheslav Shalamov

Reputation: 4417

Since you already have your kafka and kafka-connect running, you can use REST API of kafka-connect and check that yourself: https://docs.confluent.io/current/connect/references/restapi.html

If you add a new topic (topic3), all messages currently in that topic (according to retention policy) will be consumed.

PUT http://kafka-connect:8083/connectors/my-test-connector/config
{
   ...
   "topics": "topic1,topic2,topic3",
   ...
}

Check status and config of this connector:

GET http://kafka-connect:8083/connectors/my-test-connector

If you want to disable some topic, just use PUT to update config for that connector.

PUT http://kafka-connect:8083/connectors/my-test-connector/config
{
   ...
   "topics": "topic1,topic3",
   ...
}

Nothing will change for topic1 and topic3. Just topic2 will not be consumed any more.
But if you want to return it back, messages from topic2 will be consumed from the last committed offset, and not from beginning.

For each consumer group last committed offset is stored, does not matter that you removed topic from the config for a while. For this case, the consumer group will be connect-my-test-connector.

Even is you delete the connector (DELETE http://kafka-connect:8083/connectors/my-test-connector) and then create it again with the same name, the offset will be saved, and consumption will be continued from them point when you've deleted it. (mind the retention policy, it's usually 7 days).

Upvotes: 3

Related Questions