Reputation: 805
I have a spring-kafka consumer which reads records and hands them over to a cache. A scheduled task will clear the records in the cache periodically. I want to update the COMMIT OFFSET only after a batch has been successfully saved in the database. I tried passing the acknowledgment object to the cache service to invoke the acknowledge method as shown below.
public class KafkaConsumer {
@KafkaListener( topicPattern = "${kafka.topicpattern}", containerFactory = "kafkaListenerContainerFactory" )
public void receive( ConsumerRecord<String, String> record, Acknowledgment acknowledgment ) {
cacheService.add( record.value(), acknowledgment );
}
}
public class CacheService {
// concurrency handling has been left out in favor of readability
public void add( String record, Acknowledgment acknowledgment ) {
this.records.add(record);
this.lastAcknowledgment = acknowledgment;
}
public void saveBatch() { //called by scheduled task
if( records.size() == BATCH_SIZE ) {
// perform batch insert into database
this.lastAcknowledgment.acknowledge();
this.records.clear();
}
}
}
The AckMode has been set as follows:
factory.getContainerProperties().setAckMode( AbstractMessageListenerContainer.AckMode.MANUAL );
And the Auto Commit is false:
config.put( ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false );
Even though the acknowledge method is called, the commit offset is not updated. What is the best way to update the commit offset after persisting the records?
I'm using spring-kafka 2.1.7.RELEASE.
EDIT: After @GaryRussell confirmed that acknowledgements made by foreign threads are performed by the consumer thread during the next poll, I rechecked my code and found a bug in how the last acknowledgement object is set. After fixing this, the commit offset IS UPDATED as expected. So this issue has been resolved. However, I have no way to mark this question as answered.
Upvotes: 2
Views: 2488
Reputation: 40048
Here is the problem, Consumer thread is responsible to commit the offset. At the time of poll consumer thread will submit the previous batch offset.
Since in your case AUTO_COMMIT
is false and lastAcknowledgment.acknowledge()
is not acknowledge the offset is not submit.
Only one way to do this, As soon as you get the poll records make Schedule
task as Async
and hold the consumer thread and submit the offset after completion of Async task, Check this answer for reference answer
Note If you hold consumer thread more than 5 minutes rebalance will takes place here
he new Java Consumer now supports heartbeating from a background thread. There is a new configuration max.poll.interval.ms which controls the maximum time between poll invocations before the consumer will proactively leave the group (5 minutes by default). The value of the configuration request.timeout.ms must always be larger than
max.poll.interval.ms
because this is the maximum time that a JoinGroup request can block on the server while the consumer is rebalancing, so we have changed its default value to just above 5 minutes. Finally, the default value ofsession.timeout.ms
has been adjusted down to 10 seconds, and the default value ofmax.poll.records
has been changed to 500.
Special Note from spring kafka >2.1.5
Acknowledgments made on foreign threads will be performed by the consumer thread just before the next poll Thanks for @Gary Russell for this information
Upvotes: 3