Reputation: 189
I want to tell Kafka when my consumer has successfully processed a record so I have turned auto-commit off by settting enable.auto.commit
to false. I have two messages on a topic I am subscribed to at offset zero and one and have created a consumer so that each call to poll
will return at most one record (by setting max.poll.records
to 1).
I now call consumer.poll(5000)
and receive the first message but I do not acknowledge it; I do not call commitSync
or commitAsync
. If I now call consumer.poll(5000)
again, using the same consumer, I expect to get the exact same message I just read but, instead, I receive the second message.
How do I get consumer.poll
to keep handing out the same message until I explicitly acknowledge it?
Upvotes: 5
Views: 3752
Reputation: 26885
What you described is the expected behaviour. Every time you call poll()
, it will return the next messages. The offset you commit is only used when connecting a new consumer so it knows where to (re)start from.
In MessageHub, we've set the session.timeout
to 30 seconds. So you need to call poll()
slightly faster to avoid being disconnected. If your processing takes longer than that, then I can think of 2 options:
Use Kafka 0.10.2 and set max.poll.interval.ms
to tell your Kafka client to keep the session alive (without you having to call poll()
) while you process the previous record. (This feature was added in 0.10.1 but we don't support that version. 0.10.2 works because it's capable to work with 0.10.0 brokers)
Use seek() to move back to the previous offset after poll
so it keeps returning the same record.
Hope this helps!
Upvotes: 5