Reputation: 23
I'm learning about Kafka.
In my unit test, I send 4 offsets, consumser will commit 3 offsets and do nothing in the last offset.
But when I continue to poll, I didn't get the offset I not ack()
I start a new consumser and was able to poll that message.
Noted: I'm not using Spring. And I trying to avoid using it. (Some personal reason)
Below is my Property for Consumser
consumerProps[ConsumerConfig.GROUP_ID_CONFIG] = "group_test_1"
consumerProps[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = "false"
consumerProps[ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG] = 30_000L.toString()
consumerProps[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = 1
consumerProps[ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG] = "3000"
consumerProps[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
consumerProps[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost"
consumerProps[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] =
StringDeserializer::class.java.name
consumerProps[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] =
StringDeserializer::class.java.name
Example of my code proccessing offset
val record = consumser.poll(Duration.ofMillis(5_000L))
if (record.count() > 0) {
if (record.first().value() == "DDD") {
Thread.sleep(3000)
LOG.debug("Do nothing")
timeHitNotAck += 1
continue
} else {
consumser.commitSync()
}
}
Unit test footprint ----------------------------------
Send 4 messge
start first Consumser --> commit() 3 offsets, do nothing in last offset
continue to poll --> not recieve the last offset Loop try for more than 3000 millis_second
start second Consumser --> success poll last offset that didn't get commit()
Upvotes: 1
Views: 710
Reputation: 3348
If I got the question correct - If the consumer hasn't committed the offset, on the next try should it be available again? Assuming the consumer is still running.
The answer is No. This is because although to the Broker it is indeed unknown that the offset has been processed, it is known to the consumer during its runtime. Thus in the next poll, it'll request for a newer offset (that it hasn't received yet i.e. offset after "last processed") - not for the offset after "last committed"
This is when the consumer is still running, while during restart it's dependent on the Broker for the resume point (Unless of course consumer makes any specific request like "from X offset")
For further reading, do look at this question How does Kafka provides next batch of records to poll when commitAsync gets failed in committing offset
Upvotes: 1