Reputation: 3955
I would like to set an automatic offset commit interval on Kafka consumer for Spring Cloud Stream via properties.
As I see from metrics, by default Spring Cloud Stream Kafka commits offset on each consumed message. It becomes dramatic and increases load on Kafka broker for highly loaded topics (e.g. if traffic is 10K messages per second).
We declare consumers in the following way:
@Bean
public Consumer<TestEvent> testEvents() {
…
}
I tried few options, but nothing helped me.
Option #1
spring:
cloud:
function:
definition: testEvents
stream:
kafka:
binder:
brokers: xxx
consumer-properties:
auto.offset.reset: latest
auto.commit.interval.ms: 2000
enable.auto.commit: true
bindings:
testEvents-in-0:
destination: test_topic
group: ${spring.application.name}_test_topic
consumer:
concurrency: 2
With such config, messages consumed correctly, but health check has status as down
:
{
"status": "DOWN",
"components": {
"binders": {
"status": "DOWN",
"components": {
"kafka": {
"status": "DOWN",
"details": {
"error": "org.apache.kafka.common.KafkaException: Failed to construct kafka consumer"
}
}
}
}, …
As I understand, enable.auto.commit: true
is not recommended for Spring Cloud Stream.
Option #2
spring:
cloud:
function:
definition: testEvents
stream:
kafka:
binder:
brokers: xxx
configuration:
auto.offset.reset: latest
bindings:
testEvents-in-0:
destination: test_topic
group: ${spring.application.name}_test_topic
consumer:
concurrency: 2
ack-mode: TIME
ack-time: 2000
And still, such config property ack-mode: TIME
didn't help, as we see offset commit on each consumed message.
I use maven dependency spring-cloud-starter-stream-kafka
version 3.0.12.RELEASE
Upvotes: 0
Views: 2952
Reputation: 3955
Gary Russell's answer is absolutly correct, I just want to add another similar alternative by using spring.kafka.listener
properties and KafkaProperties
(from package org.springframework.boot.autoconfigure.kafka
):
spring:
cloud:
function:
definition: testEvents
stream:
kafka:
binder:
brokers: xxx
configuration:
auto.offset.reset: latest
bindings:
testEvents-in-0:
destination: test_topic
group: ${spring.application.name}_test_topic
consumer:
concurrency: 2
kafka:
listener:
ack-mode: TIME
ack-time: 1000
and the following customizer:
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(KafkaProperties kafkaProperties) {
return (container, destinationName, group) -> {
var listener = kafkaProperties.getListener();
var containerProperties = container.getContainerProperties();
containerProperties.setAckMode(listener.getAckMode());
containerProperties.setAckTime(listener.getAckTime().toMillis());
containerProperties.setLogContainerConfig(true);
};
}
Upvotes: 0
Reputation: 174554
First of all, ack-mode
is a kafka-specific consumer binding property, you have it in the common properties.
spring:
cloud:
function:
definition: testEvents
stream:
kafka:
binder:
brokers: localhost:9092
configuration:
auto.offset.reset: latest
bindings:
testEvents-in-0:
consumer:
ack-mode: TIME
bindings:
testEvents-in-0:
destination: test_topic
group: ${spring.application.name}_test_topic
consumer:
concurrency: 2
Second, there is no ack-time
property, you have to set it on the container via a container customizer:
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> cust() {
return (container, dest, group) -> {
container.getContainerProperties().setAckTime(2000L);
container.getContainerProperties().setLogContainerConfig(true);
};
}
Upvotes: 1