Reputation: 93
I have a spring boot kafka consumer which consume data from a topic and store it in a Database and acknowledge it once stored. It is working fine but the problem is happening if the application failed to get the DB connection after consuming the record ,in this case we are not sending the acknowledgement but still the message never consumed until or unless we change the group id and restart the consumer
My consumer looks like below
@KafkaListener(id = "${group.id}", topics = {"${kafka.edi.topic}"})
public void onMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
boolean shouldAcknowledge = false;
try {
String tNo = getTrackingNumber((String) record.key());
log.info("Check Duplicate By Comparing With DB records");
if (!ediRecordService.isDuplicate(tNo)) {---this checks the record in my DB
shouldAcknowledge = insertEDIRecord(record, tNo); --this return true
} else {
log.warn("Duplicate record found.");
shouldAcknowledge = true;
}
if (shouldAcknowledge) {
acknowledgment.acknowledge();
}```
So if you see the above snippet we did not sent acknowledgment.
Upvotes: 4
Views: 5670
Reputation: 40048
That is not how kafka offset works here
The records in the partitions are each assigned a sequential id number called the offset that uniquely identifies each record within the partition.
From the above statement For example, from the first poll consumer get the message at offset 300
and if it failed to persist into database because of some issue and it will not submit the offset.
So in the next poll it will get the next record where offset is 301
and if it persist data into database successfully then it will commit the offset 301
(which means all records in that partitions are processed till that offset, in above example it is 301)
Solution for this : use retry mechanism until it successfully stores data into database with some limited retries or just save failed data into error topic
and reprocess it later, or save the offset of failed records somewhere so later you can reprocess them.
Upvotes: 3