frb
frb

Reputation: 3798

Kafka: unable to consume events given an offset

I was just following the quick start guide for Kafka and I decided to test offsets a little bit.

The only modification I did to the default configuration was adding:

log.retention.minutes=5

My test topic was created as basic as possible, as suggested in the quick start guide (1 partition, replication factor 1):

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

I've produced some messages, m1 and m2 (adding date before and after):

$ date
viernes, 21 de julio de 2017, 12:16:06 CEST
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>m1
>m2
>^C
$ date
viernes, 21 de julio de 2017, 12:16:25 CEST

The thing is I'm able to consume them from the beginning, but I'm not able to consume them given an offset (for instance, offset 0, which I understand points to the first message):

$ date
viernes, 21 de julio de 2017, 12:16:29 CEST
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --offset 0 --partition 0
^CProcessed a total of 0 messages
$ date
viernes, 21 de julio de 2017, 12:17:25 CEST
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
m1
m2
^CProcessed a total of 2 messages
$ date
viernes, 21 de julio de 2017, 12:17:50 CEST

Most probably I've not understood well this statement from the documentation:

In fact, the only metadata retained on a per-consumer basis is the offset or position of that consumer in the log. This offset is controlled by the consumer: normally a consumer will advance its offset linearly as it reads records, but, in fact, since the position is controlled by the consumer it can consume records in any order it likes. For example a consumer can reset to an older offset to reprocess data from the past or skip ahead to the most recent record and start consuming from "now".

Moreover, I've seen that if a produce a third message (m3) after running the consumer as described above (i.e. pointing to offset 0), this third message is read:

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --offset 0 --partition 0
m3

Could anybody explain this behavior, please? Thanks!

Upvotes: 1

Views: 3059

Answers (2)

Sönke Liebau
Sönke Liebau

Reputation: 1973

Alright, after a lot of comments and a bit of code searching I think that this is what is happening:

When you configured your retention period with 5 minutes, you caused Kafka to delete a few of your old messages - most notably the one with the offset 0. So at some point in time the smallest offset in partition 0 became lets say 4. When you start a console consumer with --from-beginning, it internally calls a method that initializes the beginning offset to the smallest offset that can be found in the partition - 4 in this case. With that offset the consumer starts polling and receives that message and all subsequent ones, which is all messages for the partition.

If you start a consumer with --offset 0 that piece of code is bypassed and the consumer polls with an offset of 0 - the broker responds to that with an OFFSET_OUT_OF_RANGE error. The Consumer upon receiving that error resets the offset for the partition in question, and for this it uses the parameter auto.offset.reset which in theory can be earliest or latest. However, due to the way that the ConsoleConsumer is written, the only way to have this parameter set to earliest is, if you pass the command line parameter --from-beginning - which cannot be combined with --offset - so effectively the only possible value that auto.offset.reset can have here is: latest. So what happens when you poll with an offset of 0 that does not exist is an unsuccessful poll for data and after that the same behavior as if you hadn't passed any parameter at all.

Hope that helps and makes sense.

Update:

As of Kafka version 1.0 this behavior has been changed by KAFKA-5629 and should now behave a bit more in line with expectations.

Upvotes: 4

user2934558
user2934558

Reputation: 41

You can try: --offset earliest

Upvotes: 0

Related Questions