user5892612
user5892612

Reputation:

kafka-python KafkaConsumer multiple partition commit offset

Is it possible at all to commit offsets to Kafka topic which have multiple partitions so offset1 can be commited to partition1, offset2 to p2 and so on?

EDIT:

Yes, it is possible:

consumer = KafkaConsumer()
topicpartitions = [TopicPartition('topicname', partitionId) for partitionId in consumer.partitions_for_topic('topicname')]

consumer.assign(topicpartitions)
for tp in topicpartitions:
    consumer.commit({tp: OffsetAndMetadata(1000, None)})

for msg in consumer:
    #do whatever

Upvotes: 3

Views: 3566

Answers (1)

Hans Jespersen
Hans Jespersen

Reputation: 8335

Kafka offsets are always per partition. What I mean is that if your topic has 2 partitions the the messages in p0 would start at offset 0 and increase my 1 for each new message. The messages in p1 similarity start at offset 0 and increase by 1.

So if you published two messages (without keys) one would go in partition 0 with offset 0 and the other would go into partition 1 with offset 0.

Now if another app were consuming this topic and were to commit its offsets then it will be producing messages into the __consumer_offsets topic that includes its group.id, topic, partition number, and offset. For example, something like {"myconsumerid","mytopic",P0,1} and {"myconsumerid","mytopic",P1,1}.

Should the app stop and one or two other consumer start with the same group.id they will continue from the last committed offset for which ever partition(s) they are assigned.

If you want to reposition a consumers offsets to any other position you can change the committed offsets for the group using the 0.11 Kafka tool

bin/kafka-consumer-groups.sh --reset-offsets

This tool will allow you to set the offset for each partition independantly if you give it the right flags.

You can call this tool from a Python program if you like. All existing consumers in the consumer group should be shutdown first or they may over write the offsets.

If you want to write a Python version of this tool rather than running the existing CLI command you need to find a Python client that supports seek() or so you can change the offsets to what you want them to be and then commit them in that position for when the consuming app restarts. An alternative would be to forgo dynamic partition assignment and manually assign() the partitions you want to change and commit offsets to the assigned list. You cannot use dynamically managed partition subscriptions and manually assigned partitions at the same time in the same app.

You will also need to ensure that all other consumers using the same consumer group on these partitions are shut down or the committed offsets will get overwritten by the other consumers as soon as they auto-commit or manually commit their offsets over the ones you have just set.

Upvotes: 1

Related Questions