Reputation: 1749
I have a topic with 40 partitions. Settings are such:
def on_assign (c,ps):
for p in ps:
p.offset=0
print ps
c.assign(ps)
conf = {'bootstrap.servers': 'localhost:9092'
'enable.auto.commit' : False,
'group.id' : 'confluent_consumer',
'default.topic.config': {'auto.offset.reset': 'earliest'}
}
consumer = Consumer(**conf)
consumer.subscribe(['topic.source'], on_assign=on_assign)
msg = consumer.poll(timeout=100000)
print "Topic is %s: | Partition is %d: | Offset is : %d | key is :%s " % (msg.topic(), msg.partition(), msg.offset(), msg.key())
I want to read from offset 0 for all partitions of the topic topic.source
. But I don't see it happening for all partitions. For some partitions it reads from a specific offset which I'm assuming is the committed offset, changing the group.id
every time doesn't help either. How can I read from beginning for all partitions of this topic irrespective of the committed offsets?
I printed ps
in on_assign()
and it printed something like this for all 40 partitions:
[TopicPartition{topic=topic.source,partition=0,offset=0,error=None},TopicPartition{topic=topic.source,partition=1,offset=0,error=None}....] and so on
Upvotes: 4
Views: 9127
Reputation: 26885
If you use set group.id
to a new value or use a group that has not committed any offset with auto.offset.reset
set to earliest
then the consumer will start from the beginning of the partition.
That said, the beginning might not be offset 0. Depending on your broker's log retention settings, Kafka can be deleting messages thus the 1st available message in your partitions could be at any offset.
Upvotes: 5