NoName
NoName

Reputation: 1749

Confluent Kafka: Consumer does not read from beginning for all partitions in a topic

I have a topic with 40 partitions. Settings are such:

def on_assign (c,ps):
    for p in ps:
        p.offset=0
    print ps
    c.assign(ps)

conf = {'bootstrap.servers': 'localhost:9092'
        'enable.auto.commit' : False,
        'group.id' : 'confluent_consumer',
        'default.topic.config': {'auto.offset.reset': 'earliest'}
        }
consumer = Consumer(**conf)
consumer.subscribe(['topic.source'], on_assign=on_assign)

msg = consumer.poll(timeout=100000)
print "Topic is %s: | Partition is %d: | Offset is : %d | key is :%s " % (msg.topic(), msg.partition(), msg.offset(), msg.key())

I want to read from offset 0 for all partitions of the topic topic.source. But I don't see it happening for all partitions. For some partitions it reads from a specific offset which I'm assuming is the committed offset, changing the group.id every time doesn't help either. How can I read from beginning for all partitions of this topic irrespective of the committed offsets?

I printed ps in on_assign() and it printed something like this for all 40 partitions:

[TopicPartition{topic=topic.source,partition=0,offset=0,error=None},TopicPartition{topic=topic.source,partition=1,offset=0,error=None}....] and so on

Upvotes: 4

Views: 9127

Answers (1)

Mickael Maison
Mickael Maison

Reputation: 26885

If you use set group.id to a new value or use a group that has not committed any offset with auto.offset.reset set to earliest then the consumer will start from the beginning of the partition.

That said, the beginning might not be offset 0. Depending on your broker's log retention settings, Kafka can be deleting messages thus the 1st available message in your partitions could be at any offset.

Upvotes: 5

Related Questions