needoriginalname
needoriginalname

Reputation: 721

How to commit after each kafka message in spring-kafka?

In our application we use a kafka consumer to determine to send an email.

There was an issue we had the other day, where the kafka partition was timing out before it could read and process all its records. As a result, it looped back to the start of the partition and wasn't able to finished the set of records it had received and new data generated after the loop start never got processed.

My teams suggested that we could tell Kafka to commit after each message is read, however I can't figure out how to do that from Spring-kakfa.

The application uses spring-kafka 2.1.6, and the consumer code kinda resembles this.

@KafkaListener(topics = "${kafka.topic}", groupId = "${kafka.groupId}")
public void consume(String message, @Header("kafka_offset") int offSet) {
    try{
        EmailData data = objectMapper.readValue(message, EmailData.class);
        if(isEligableForEmail(data)){
            emailHandler.sendEmail(data)
        }
    } catch (Exception e) {
        log.error("Error: "+e.getMessage(), e);
    }
}

Note: sendEmail function uses CompletableFutures, as it has to call a different API before sending out an email.

Configuration: (snippet of the yaml file for the consumer, and a part of the producer)

consumer:
      max.poll.interval.ms: 3600000
producer: 
      retries: 0
      batch-size: 100000
      acks: 0
      buffer-memory: 33554432
      request.timeout.ms: 60000
      linger.ms: 10
      max.block.ms: 5000

Upvotes: 3

Views: 4164

Answers (2)

Gary Russell
Gary Russell

Reputation: 174544

Set the container ackMode property to AckMode.RECORD to commit the offset after each record.

You should also consider reducing max.poll.records or increasing max.poll.interval.ms Kafka consumer properties.

Upvotes: 1

Ryuzaki L
Ryuzaki L

Reputation: 40048

If you want manual Acknowledgment, then you can provide the Acknowledgment in method arguments

@KafkaListener(topics = "${kafka.topic}", groupId = "${kafka.groupId}")
public void consume(String message, Acknowledgment ack, @Header("kafka_offset") int offSet) {

When using manual AckMode, you can also provide the listener with the Acknowledgment. The following example also shows how to use a different container factory.

Example from docs @KafkaListener Annotation

@KafkaListener(id = "cat", topics = "myTopic",
      containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
...
ack.acknowledge();

}

Upvotes: 3

Related Questions