Reputation: 721
In our application we use a kafka consumer to determine to send an email.
There was an issue we had the other day, where the kafka partition was timing out before it could read and process all its records. As a result, it looped back to the start of the partition and wasn't able to finished the set of records it had received and new data generated after the loop start never got processed.
My teams suggested that we could tell Kafka to commit after each message is read, however I can't figure out how to do that from Spring-kakfa.
The application uses spring-kafka 2.1.6, and the consumer code kinda resembles this.
@KafkaListener(topics = "${kafka.topic}", groupId = "${kafka.groupId}")
public void consume(String message, @Header("kafka_offset") int offSet) {
try{
EmailData data = objectMapper.readValue(message, EmailData.class);
if(isEligableForEmail(data)){
emailHandler.sendEmail(data)
}
} catch (Exception e) {
log.error("Error: "+e.getMessage(), e);
}
}
Note: sendEmail function uses CompletableFutures, as it has to call a different API before sending out an email.
Configuration: (snippet of the yaml file for the consumer, and a part of the producer)
consumer:
max.poll.interval.ms: 3600000
producer:
retries: 0
batch-size: 100000
acks: 0
buffer-memory: 33554432
request.timeout.ms: 60000
linger.ms: 10
max.block.ms: 5000
Upvotes: 3
Views: 4164
Reputation: 174544
Set the container ackMode
property to AckMode.RECORD
to commit the offset after each record.
You should also consider reducing max.poll.records
or increasing max.poll.interval.ms
Kafka consumer properties.
Upvotes: 1
Reputation: 40048
If you want manual Acknowledgment, then you can provide the Acknowledgment
in method arguments
@KafkaListener(topics = "${kafka.topic}", groupId = "${kafka.groupId}")
public void consume(String message, Acknowledgment ack, @Header("kafka_offset") int offSet) {
When using manual AckMode, you can also provide the listener with the Acknowledgment. The following example also shows how to use a different container factory.
Example from docs @KafkaListener Annotation
@KafkaListener(id = "cat", topics = "myTopic",
containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
...
ack.acknowledge();
}
Upvotes: 3