user1816183
user1816183

Reputation: 141

How to manually acknoweldge offset after multiple commits

I am working on a Spring kafka application. I am sending a batch of records to it. I only want to acknowledge the offset after all of the records have been sent rather than after each record.

I've tried the code below (copied from here https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/index.html#_usage_examples) but this acknowledges the offset after each record rather than the batch of records. I set the following property first

spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset=false

@SpringBootApplication
@EnableBinding(Sink.class)
public class ManuallyAcknowdledgingConsumer {

 public static void main(String[] args) {
     SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
 }

 @StreamListener(Sink.INPUT)
 public void process(Message<?> message) {
     Acknowledgment acknowledgment =     message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
     if (acknowledgment != null) {
         System.out.println("Acknowledgment provided");
         acknowledgment.acknowledge();
     }
 }
}

How would I go about manually acknowledging after the batch of records?

Upvotes: 0

Views: 65

Answers (1)

Spasoje Petronijević
Spasoje Petronijević

Reputation: 1608

As you can see here you can receive list of messages so your example would look like this:

@StreamListener(Sink.INPUT)
 public void process(List<Message<?>> messages) {
     Acknowledgment acknowledgment =     message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
     if (acknowledgment != null) {
         System.out.println("Acknowledgment provided");
         acknowledgment.acknowledge();
     }
 }

And after you are done with all records you can call acknowledgment.acknowledge();

Upvotes: 1

Related Questions