Reputation: 141
I am working on a Spring kafka application. I am sending a batch of records to it. I only want to acknowledge the offset after all of the records have been sent rather than after each record.
I've tried the code below (copied from here https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/index.html#_usage_examples) but this acknowledges the offset after each record rather than the batch of records. I set the following property first
spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset=false
@SpringBootApplication
@EnableBinding(Sink.class)
public class ManuallyAcknowdledgingConsumer {
public static void main(String[] args) {
SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
}
@StreamListener(Sink.INPUT)
public void process(Message<?> message) {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
}
}
How would I go about manually acknowledging after the batch of records?
Upvotes: 0
Views: 65
Reputation: 1608
As you can see here you can receive list of messages so your example would look like this:
@StreamListener(Sink.INPUT)
public void process(List<Message<?>> messages) {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
}
And after you are done with all records you can call acknowledgment.acknowledge();
Upvotes: 1