Biplab
Biplab

Reputation: 93

Spring Boot Manual Acknowledgement of kafka messages is not working

I have a spring boot kafka consumer which consume data from a topic and store it in a Database and acknowledge it once stored. It is working fine but the problem is happening if the application failed to get the DB connection after consuming the record ,in this case we are not sending the acknowledgement but still the message never consumed until or unless we change the group id and restart the consumer

My consumer looks like below

  @KafkaListener(id = "${group.id}", topics = {"${kafka.edi.topic}"})
  public void onMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
      boolean shouldAcknowledge = false;
      try {
          String tNo = getTrackingNumber((String) record.key());

          log.info("Check Duplicate By Comparing With DB records");

          if (!ediRecordService.isDuplicate(tNo)) {---this checks the record in my DB
              shouldAcknowledge = insertEDIRecord(record, tNo); --this return true
          } else {
              log.warn("Duplicate record found.");
              shouldAcknowledge = true;
          }
          if (shouldAcknowledge) {
              acknowledgment.acknowledge();
          }```



So if you see the above snippet we did not sent acknowledgment.

Upvotes: 4

Views: 5670

Answers (1)

Ryuzaki L
Ryuzaki L

Reputation: 40048

That is not how kafka offset works here

The records in the partitions are each assigned a sequential id number called the offset that uniquely identifies each record within the partition.

From the above statement For example, from the first poll consumer get the message at offset 300 and if it failed to persist into database because of some issue and it will not submit the offset.

So in the next poll it will get the next record where offset is 301 and if it persist data into database successfully then it will commit the offset 301 (which means all records in that partitions are processed till that offset, in above example it is 301)

Solution for this : use retry mechanism until it successfully stores data into database with some limited retries or just save failed data into error topic and reprocess it later, or save the offset of failed records somewhere so later you can reprocess them.

Upvotes: 3

Related Questions