Ashwin
Ashwin

Reputation: 479

How to describe a topic using kafka client in Python

I'm beginner to kafka client in python, i need some help to describe the topics using the client.

I was able to list all my kafka topics using the following code:-

consumer = kafka.KafkaConsumer(group_id='test', bootstrap_servers=['kafka1'])
topicList = consumer.topics()

Upvotes: 5

Views: 10187

Answers (4)

Alvaro del Castillo
Alvaro del Castillo

Reputation: 151

I have found how to do it with kafka-python:

from kafka.admin import KafkaAdminClient, ConfigResource, ConfigResourceType
KAFKA_URL = "localhost:9092" # kafka broker
KAFKA_TOPIC = "test" # topic name

admin_client = KafkaAdminClient(bootstrap_servers=[KAFKA_URL])
configs = admin_client.describe_configs(config_resources=[ConfigResource(ConfigResourceType.TOPIC, KAFKA_TOPIC)])
config_list = configs.resources[0][4]

In config_list (list of tuples) you have all the configs for the topic.

Upvotes: 8

Lav Kush
Lav Kush

Reputation: 21

Refer: https://docs.confluent.io/current/clients/confluent-kafka-python/

  1. list_topics provide confluent_kafka.admin.TopicMetadata (topic, partitions)
  2. kafka.admin.TopicMetadata.partitions provide: confluent_kafka.admin.PartitionMetadata (Partition id, leader, replicas, isrs)

    from confluent_kafka.admin import AdminClient
    kafka_admin = AdminClient({"bootstrap.servers": bootstrap_servers})    
    for topic in topics:    
        x = kafka_admin.list_topics(topic=topic)    
        print x.topics, '\n'    
        for key, value in x.topics.items():    
            for keyy, valuey in value.partitions.items():    
                print keyy, ' Partition id : ', valuey, 'leader : ', valuey.leader,' replica: ', valuey.replicas
    

Upvotes: 2

Ashwin
Ashwin

Reputation: 479

After referring multiple articles and code samples, I was able to do this through describe_configs using confluent_kafka.

Link 1 [Confluent-kafka-python] Link 2 Git Sample

Below is my sample code!!

from confluent_kafka.admin import AdminClient, NewTopic, NewPartitions, ConfigResource
import confluent_kafka
import concurrent.futures

#Creation of config
conf = {'bootstrap.servers': 'kafka1','session.timeout.ms': 6000}
adminClient = AdminClient(conf)
topic_configResource = adminClient.describe_configs([ConfigResource(confluent_kafka.admin.RESOURCE_TOPIC, "myTopic")])
    for j in concurrent.futures.as_completed(iter(topic_configResource.values())):
        config_response = j.result(timeout=1)

Upvotes: 14

Lalit
Lalit

Reputation: 2014

Interestingly, for Java this functionality (describeTopics()) sits within the KafkaAdminCLient.java.

So, I was trying to look for the python equivalent of the same and I discovered the code repository of kafka-python.

The documentation (in-line comments) in admin-client equivalent in kafka-python package says the following:

describe topics functionality is in ClusterMetadata
Note: if implemented here, send the request to the controller

I then switched to cluster.py file in the same repository. This contains the topics() function that you've used to retrieve the list of topics and the following 2 functions that could help you achieve the describe functionality:

  1. partitions_for_topic() - Return set of all partitions for topic (whether available or not)
  2. available_partitions_for_topic() - Return set of partitions with known leaders

Note: I haven't tried this myself so I'm not entierly sure if the behaviour would be identical to what you would see in the result for kafka-topics --describe ... command but worth a try.

I hope this helps!

Upvotes: 0

Related Questions