Thomas Schreiter
Thomas Schreiter

Reputation: 810

Check whether a Kafka topic exists in Python

I want to create a Kafka topic if it does not already exist. I know how to create a topic via the bash, but I don't know how to check whether it exists.

topic_exists = ??????
if not topic_exists:
    subprocess.call([os.path.join(KAFKABIN, 'kafka-topics.sh'),
        '--create',  
        '--zookeeper', '{}:2181'.format(KAFKAHOST),
        '--topic', str(self.topic), 
        '--partitions', str(self.partitions),
        '--replication-factor', str(self.replication_factor)])

Upvotes: 7

Views: 13947

Answers (5)

Raghu
Raghu

Reputation: 41

Another way, If you are using KafkaAdminClient We can get the set of topics.

from kafka import KafkaAdminClient
admin = KafkaAdminClient(**kafka_admin_client_config)
kafka_topics = admin._client.cluster.topics()

Upvotes: 0

novaXire
novaXire

Reputation: 136

Use KafkaAdminClient from kafka api. Not docummented but a method list_topics exists !

Upvotes: 0

Rohitashwa Nigam
Rohitashwa Nigam

Reputation: 408

Use kafka-python consumer api for this.

import kafka 
consumer = kafka.KafkaConsumer(group_id='test', bootstrap_servers=your_server_list) 
new_topics = set(wanted_topics)-set(consumer.topics())
for topic in new_topics:
    create(topic)

Upvotes: 3

Yonatan Kiron
Yonatan Kiron

Reputation: 2818

Another nice way is with python kafka module:

kafka_client = kafka.KafkaClient(kafka_server_name)
server_topics = kafka_client.topic_partitions

if topic_name in server_topics:
   your code....

kafka_client.topic_partitions returns list of topics.

Upvotes: 9

chrsblck
chrsblck

Reputation: 4088

You can use the --list (List all available topics) option for kafka-topics.sh and see if self.topic exists in the topics array, as shown below.

Depending on the number of topics you have this approach might be a bit heavy. If this is the case, you might be able to get away with using --describe (List details for the given topics) which will likely return empty if the topic doesn't exist. I haven't thoroughly tested this, so I can't say for sure how solid this solution (--describe) is, but it might be worth it for you to investigate a bit further.

wanted_topics = ['host_updates_queue', 'foo_bar']

topics = subprocess.check_output([os.path.join(KAFKABIN, 'kafka-topics.sh'),
        '--list',
        '--zookeeper', '{}:2181'.format(KAFKAHOST)])

for wanted in wanted_topics:
    if wanted in topics:
        print '\'{}\' topic exists!'.format(wanted)
    else:
        print '\'{}\' topic does NOT exist!'.format(wanted)

    topic_desc = subprocess.check_output([os.path.join(KAFKABIN, 'kafka-topics.sh'),
        '--describe',
        '--topic', wanted,
        '--zookeeper', '{}:2181'.format(KAFKAHOST)])

    if not topic_desc:
        print 'No description found for the topic \'{}\''.format(wanted)

OUTPUT:

root@dev:/opt/kafka/kafka_2.10-0.8.2.1# ./t.py
'host_updates_queue' topic exists!
'foo_bar' topic does NOT exist!
No description found for the topic 'foo_bar'

There is also a Broker Configuration available so you don't have to take any of these steps:

auto.create.topics.enable | true | Enable auto creation of topic on the server. If this is set to true then attempts to produce data or fetch metadata for a non-existent topic will automatically create it with the default replication factor and number of partitions.

I would take this approach if possible.

Note that you should set topic configs (server.properties) on your broker for num.partitions and default.replication.factor to match your settings in your code snippet.

Upvotes: 7

Related Questions