Reputation: 1
I am consuming Kafka messages from a topic, but the issue is that every time the consumer restarts it reads older processed messages.
I have used auto.offset.reset=earliest
. Will setting it manually using commit async help me overcome this issue?
I see that Kafka already has enabled auto commit to true
by default.
Upvotes: 0
Views: 4191
Reputation: 9357
I have used
auto.offset.reset=earliest
. Wwill setting it manually using commit async help me overcome this issue?
When the setting auto.offset.reset=earliest
is set the consumer will read from the earliest offset that is available instead of from the last offset. So, the first time you start your process with a new group.id
and set this to earliest
it will read from the starting offset.
Here is how we the issue can be debugged..
If your consumer group.id
is same across every restart, you need to check if the commit is actually happening.
enable.auto.commit
to false
anywhere.auto.commit.interval.ms
) which is by default 5 sec
and see if you have changed it to something higher and that you are restarting your process before the commit is getting triggered.commitAsync()
or even commitSync()
to manually trigger. Use commitSync()
(blocking call) for testing if there is any exception while committing. Few possible errors during committing are (from docs)
CommitFailedException
- When you are trying to commit to partitions that are no longer assigned to this consumer because the consumer is for example no longer part of the group this exception would be thrown
RebalanceInProgressException
- If the consumer instance is in the middle of a rebalance so it is not yet determined which partitions would be assigned to the consumer.
TimeoutException
- if the timeout specified bydefault.api.timeout.ms
expires before successful completion of the offset commit
Apart from this..
seek()
or seekToBeginning()
in your consumer code anywhere. If you are doing this and calling poll()
you will likely get older messages also.Without looking into the code it is hard to tell what exactly is the error. This answer provides only an insight on debugging your scenario.
Upvotes: 2