Reputation: 103
I want to understand the behavior of the kafkaConsumer.poll() method I configured my consumer to not auto commit
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", bootstrapAddress);
KafkaConsumer consumer = new KafkaConsumer(properties);
As far as I understand, based on the Javadoc, if I do
ConsumerRecords firstBatch = consumer.poll(0l);
ConsumerRecords secondBatch = consumer.poll(0l);
Both firstBatch
and secondBatch
should contain the same ConsumerRecords
, assuming that there is only one partition in the topic, since the offset has not been committed.
Is my assumption correct? My problem is that each time I call consumer.poll(0l)
the the next batch of ConsumerRecords
are fetched
Upvotes: 2
Views: 850
Reputation: 40098
Both firstBatch and secondBatch should contain the same ConsumerRecords
This is wrong, Kafka consumer offset will increase automatically for every subsequent poll even auto offset
is disabled or offset
is submitted manually
The position of the consumer gives the offset of the next record that will be given out. It will be one larger than the highest offset the consumer has seen in that partition. It automatically advances every time the consumer receives messages in a call to poll(long)
The committed position is the last offset that has been stored securely. Should the process fail and restart, this is the offset that the consumer will recover to. The consumer can either automatically commit offsets periodically; or it can choose to control this committed position manually by calling one of the commit APIs (e.g. commitSync and commitAsync).
And your assumption is right in another way, when offset
is not committed and kafka consumer is restarted it will poll the old batch or from the beginning where ever the old offset is submitted.
Upvotes: 2