stewvsshark
stewvsshark

Reputation: 21

How to get the latest offset from each partition using kafka-python?

I'm trying to get the latest offset (not committed offset) from each partition for a given topic.

from kafka import KafkaConsumer, TopicPartition

topic = 'test-topic'
broker = 'localhost:9092'

consumer = KafkaConsumer(bootstrap_servers=broker)

tp = TopicPartition(topic, 0)        #1
consumer.assign([tp])                #2
consumer.seek_to_end(tp)             #3
last_offset = consumer.position(tp)  #4

for i in consumer.partitions_for_topic(topic):
    tp = TopicPartition(topic, i)
    consumer.assign([tp])
    consumer.seek_to_end(tp)
    last_offset = consumer.position(tp)
    print(last_offset)

The preceding code does work and prints the offset of each partition. However, notice how I have the same 4 lines outside of the loop as well as inside of the loop. If I remove any of the lines #1 - #4 (the 4 lines directly preceding the for loop) I get the error: File "check_kafka_offset.py", line 19, in for i in consumer.partitions_for_topic(topic): TypeError: 'NoneType' object is not iterable

Why do I need to have the 4 lines before the for loop?

Upvotes: 2

Views: 15189

Answers (2)

prossblad
prossblad

Reputation: 944

Here is a simple and well documented function:

from kafka import TopicPartition
def getTopicInfos(consumer, topic: str):
    """
    Get topic's informations like partitions with their last offsets.
    Example of result: {'topic': 'myTopic', 'partitions': ['{"partition": 0, "lastOffset": 47}', '{"partition": 1, "lastOffset": 98}']})

    - Parameters:
      consumer: A Kafka consumer.
      topic: A topic name.

    - Return:
      The topic's informations.
    """
    # Get topic-partition pairs
    # E.g: [TopicPartition(topic='myTopic', partition=0), TopicPartition(topic='myTopic', partition=1)]
    tp = [TopicPartition(topic, partition) for partition in consumer.partitions_for_topic(topic)]

    # Get last offsets
    # E.g: {TopicPartition(topic='myTopic', partition=0): 47, TopicPartition(topic='myTopic', partition=1): 98}
    tplo = consumer.end_offsets(tp)

    # Format partition-lastOffset pairs
    # E.g: ['{"partition": 0, "lastOffset": 47}', '{"partition": 1, "lastOffset": 98}']
    plo = ['{' + f'"partition": {item.partition}, "lastOffset": {tplo.get(item)}' + '}' for item in tplo]

    # Concat topic with partition-lastOffset pairs
    # E.g: {'topic': 'myTopic', 'partitions': ['{"partition": 0, "lastOffset": 47}', '{"partition": 1, "lastOffset": 98}']})
    tplo = {"topic": topic, "partitions": plo}

    # Return the result
    return tplo

Upvotes: 1

Chris Matta
Chris Matta

Reputation: 3433

You can use the end_offsets(partitions) function in that client to get the last offset for the partitions specified. Note that the returned offset is the next offset, that is the current end +1. Documentation here.

Edit: Example implementation:

from kafka import KafkaProducer, KafkaConsumer, TopicPartition
from kafka.errors import KafkaError
import json
import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

BOOTSTRAP="""cluster:9092"""
API_KEY="""redacted"""
API_SECRET="""redacted"""
TOPIC="python-test"

consumer = KafkaConsumer(
    group_id="my-group",
    bootstrap_servers=[BOOTSTRAP],
    security_protocol="SASL_SSL",
    sasl_mechanism="PLAIN",
    sasl_plain_username=API_KEY,
    sasl_plain_password=API_SECRET,
    value_deserializer=lambda m: json.loads(m.decode('ascii')),
    auto_offset_reset='earliest'
)

PARTITIONS = []
for partition in consumer.partitions_for_topic(TOPIC):
    PARTITIONS.append(TopicPartition(TOPIC, partition))
    
end_offsets = consumer.end_offsets(PARTITIONS)
print(end_offsets)

and end_offsets looks like this:

{TopicPartition(topic=u'python-test', partition=0): 5,
 TopicPartition(topic=u'python-test', partition=1): 20,
 TopicPartition(topic=u'python-test', partition=2): 0}

Upvotes: 5

Related Questions