Bushra Kidwai
Bushra Kidwai

Reputation: 1

Kafka consumer consuming older messages on restarting

I am consuming Kafka messages from a topic, but the issue is that every time the consumer restarts it reads older processed messages.

I have used auto.offset.reset=earliest. Will setting it manually using commit async help me overcome this issue?

I see that Kafka already has enabled auto commit to true by default.

Upvotes: 0

Views: 4191

Answers (1)

JavaTechnical
JavaTechnical

Reputation: 9357

I have used auto.offset.reset=earliest. Wwill setting it manually using commit async help me overcome this issue?

When the setting auto.offset.reset=earliest is set the consumer will read from the earliest offset that is available instead of from the last offset. So, the first time you start your process with a new group.id and set this to earliest it will read from the starting offset.

Here is how we the issue can be debugged..

If your consumer group.id is same across every restart, you need to check if the commit is actually happening.

  1. Cross check if you are manually overriding enable.auto.commit to false anywhere.
  2. Next, check the auto commit interval (auto.commit.interval.ms) which is by default 5 sec and see if you have changed it to something higher and that you are restarting your process before the commit is getting triggered.
  3. You can also use commitAsync() or even commitSync() to manually trigger. Use commitSync() (blocking call) for testing if there is any exception while committing. Few possible errors during committing are (from docs)

CommitFailedException - When you are trying to commit to partitions that are no longer assigned to this consumer because the consumer is for example no longer part of the group this exception would be thrown

RebalanceInProgressException - If the consumer instance is in the middle of a rebalance so it is not yet determined which partitions would be assigned to the consumer.

TimeoutException - if the timeout specified by default.api.timeout.ms expires before successful completion of the offset commit

Apart from this..

  1. Also check if you are doing seek() or seekToBeginning() in your consumer code anywhere. If you are doing this and calling poll() you will likely get older messages also.
  2. If you are using Embedded Kafka and doing some testing, the topic and the consumer groups will likely be created everytime you restart your test, there by reading from start. Check if it is a similar case.

Without looking into the code it is hard to tell what exactly is the error. This answer provides only an insight on debugging your scenario.

Upvotes: 2

Related Questions