Reputation: 903
Code:
from kafka import KafkaConsumer
task_event = TaskEvent()
consumer = KafkaConsumer('test',bootstrap_servers=["localhost:9092"],group_id=None, auto_offset_reset='smallest')
for msg in consumer:
print msg.offset
Output:
0
1
2
.
.
16
I have a total of 16 messages in the topic test
.
What determines Kafka consumer offset? - According to the accepted answer, You have a consumer in a consumer group group1 that has consumed 5 messages and died. Next time you start this consumer it won't even use that auto.offset.reset config and will continue from the place it died because it will just fetch the stored offset from the offset storage
And as per the Python API documentation - http://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html the enable_auto_commit
is by default True
which means the consumer's offset should be committed in the background. But when I stop and run the above multiple times I am getting the same output which is not expected if the auto_commit
is by default True
(Assuming the rule will be same for any API may it be Java or Python).
Thanks.
Upvotes: 1
Views: 2957
Reputation: 903
As per the documentation,
group_id (str or None) – The name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets. If None, auto-partition assignment (via group coordinator) and offset commits are disabled.
In my code, the group id was None
. I changed it and gave a group name and the offsets were being committed.
consumer = KafkaConsumer('test',bootstrap_servers=["localhost:9092"],group_id='my_group', auto_offset_reset='smallest')
Upvotes: 2