Reputation: 810
I want to create a Kafka topic if it does not already exist. I know how to create a topic via the bash, but I don't know how to check whether it exists.
topic_exists = ??????
if not topic_exists:
subprocess.call([os.path.join(KAFKABIN, 'kafka-topics.sh'),
'--create',
'--zookeeper', '{}:2181'.format(KAFKAHOST),
'--topic', str(self.topic),
'--partitions', str(self.partitions),
'--replication-factor', str(self.replication_factor)])
Upvotes: 7
Views: 13947
Reputation: 41
Another way, If you are using KafkaAdminClient
We can get the set of topics.
from kafka import KafkaAdminClient
admin = KafkaAdminClient(**kafka_admin_client_config)
kafka_topics = admin._client.cluster.topics()
Upvotes: 0
Reputation: 136
Use KafkaAdminClient
from kafka api. Not docummented but a method list_topics
exists !
Upvotes: 0
Reputation: 408
Use kafka-python consumer api for this.
import kafka
consumer = kafka.KafkaConsumer(group_id='test', bootstrap_servers=your_server_list)
new_topics = set(wanted_topics)-set(consumer.topics())
for topic in new_topics:
create(topic)
Upvotes: 3
Reputation: 2818
Another nice way is with python kafka module:
kafka_client = kafka.KafkaClient(kafka_server_name)
server_topics = kafka_client.topic_partitions
if topic_name in server_topics:
your code....
kafka_client.topic_partitions returns list of topics.
Upvotes: 9
Reputation: 4088
You can use the --list (List all available topics)
option for kafka-topics.sh
and see if self.topic
exists in the topics
array, as shown below.
Depending on the number of topics you have this approach might be a bit heavy. If this is the case, you might be able to get away with using --describe (List details for the given topics)
which will likely return empty if the topic doesn't exist. I haven't thoroughly tested this, so I can't say for sure how solid this solution (--describe
) is, but it might be worth it for you to investigate a bit further.
wanted_topics = ['host_updates_queue', 'foo_bar']
topics = subprocess.check_output([os.path.join(KAFKABIN, 'kafka-topics.sh'),
'--list',
'--zookeeper', '{}:2181'.format(KAFKAHOST)])
for wanted in wanted_topics:
if wanted in topics:
print '\'{}\' topic exists!'.format(wanted)
else:
print '\'{}\' topic does NOT exist!'.format(wanted)
topic_desc = subprocess.check_output([os.path.join(KAFKABIN, 'kafka-topics.sh'),
'--describe',
'--topic', wanted,
'--zookeeper', '{}:2181'.format(KAFKAHOST)])
if not topic_desc:
print 'No description found for the topic \'{}\''.format(wanted)
OUTPUT:
root@dev:/opt/kafka/kafka_2.10-0.8.2.1# ./t.py
'host_updates_queue' topic exists!
'foo_bar' topic does NOT exist!
No description found for the topic 'foo_bar'
There is also a Broker Configuration available so you don't have to take any of these steps:
auto.create.topics.enable | true | Enable auto creation of topic on the server. If this is set to true then attempts to produce data or fetch metadata for a non-existent topic will automatically create it with the default replication factor and number of partitions.
I would take this approach if possible.
Note that you should set topic configs (server.properties
) on your broker for num.partitions
and default.replication.factor
to match your settings in your code snippet.
Upvotes: 7