Reputation: 479
I'm beginner to kafka client in python, i need some help to describe the topics using the client.
I was able to list all my kafka topics using the following code:-
consumer = kafka.KafkaConsumer(group_id='test', bootstrap_servers=['kafka1'])
topicList = consumer.topics()
Upvotes: 5
Views: 10187
Reputation: 151
I have found how to do it with kafka-python:
from kafka.admin import KafkaAdminClient, ConfigResource, ConfigResourceType
KAFKA_URL = "localhost:9092" # kafka broker
KAFKA_TOPIC = "test" # topic name
admin_client = KafkaAdminClient(bootstrap_servers=[KAFKA_URL])
configs = admin_client.describe_configs(config_resources=[ConfigResource(ConfigResourceType.TOPIC, KAFKA_TOPIC)])
config_list = configs.resources[0][4]
In config_list (list of tuples) you have all the configs for the topic.
Upvotes: 8
Reputation: 21
Refer: https://docs.confluent.io/current/clients/confluent-kafka-python/
kafka.admin.TopicMetadata.partitions provide: confluent_kafka.admin.PartitionMetadata (Partition id, leader, replicas, isrs)
from confluent_kafka.admin import AdminClient
kafka_admin = AdminClient({"bootstrap.servers": bootstrap_servers})
for topic in topics:
x = kafka_admin.list_topics(topic=topic)
print x.topics, '\n'
for key, value in x.topics.items():
for keyy, valuey in value.partitions.items():
print keyy, ' Partition id : ', valuey, 'leader : ', valuey.leader,' replica: ', valuey.replicas
Upvotes: 2
Reputation: 479
After referring multiple articles and code samples, I was able to do this through describe_configs using confluent_kafka.
Link 1 [Confluent-kafka-python] Link 2 Git Sample
Below is my sample code!!
from confluent_kafka.admin import AdminClient, NewTopic, NewPartitions, ConfigResource
import confluent_kafka
import concurrent.futures
#Creation of config
conf = {'bootstrap.servers': 'kafka1','session.timeout.ms': 6000}
adminClient = AdminClient(conf)
topic_configResource = adminClient.describe_configs([ConfigResource(confluent_kafka.admin.RESOURCE_TOPIC, "myTopic")])
for j in concurrent.futures.as_completed(iter(topic_configResource.values())):
config_response = j.result(timeout=1)
Upvotes: 14
Reputation: 2014
Interestingly, for Java this functionality (describeTopics()
) sits within the KafkaAdminCLient.java.
So, I was trying to look for the python equivalent of the same and I discovered the code repository of kafka-python.
The documentation (in-line comments) in admin-client equivalent in kafka-python package says the following:
describe topics functionality is in ClusterMetadata Note: if implemented here, send the request to the controller
I then switched to cluster.py file in the same repository. This contains the topics()
function that you've used to retrieve the list of topics and the following 2 functions that could help you achieve the describe
functionality:
partitions_for_topic()
- Return set of all partitions for topic (whether available or not)available_partitions_for_topic()
- Return set of partitions with known leadersNote: I haven't tried this myself so I'm not entierly sure if the behaviour would be identical to what you would see in the result for kafka-topics --describe ...
command but worth a try.
I hope this helps!
Upvotes: 0