Vasyl Sarzhynskyi
Vasyl Sarzhynskyi

Reputation: 3955

How to set offset commit interval for Spring Cloud Stream Kafka via properties

I would like to set an automatic offset commit interval on Kafka consumer for Spring Cloud Stream via properties.

As I see from metrics, by default Spring Cloud Stream Kafka commits offset on each consumed message. It becomes dramatic and increases load on Kafka broker for highly loaded topics (e.g. if traffic is 10K messages per second).

We declare consumers in the following way:

@Bean
public Consumer<TestEvent> testEvents() {
    …
}

I tried few options, but nothing helped me.

Option #1

spring:
  cloud:
    function:
      definition: testEvents
    stream:
      kafka:
        binder:
          brokers: xxx
          consumer-properties:
            auto.offset.reset: latest
            auto.commit.interval.ms: 2000
            enable.auto.commit: true
      bindings:
        testEvents-in-0:
          destination: test_topic
          group: ${spring.application.name}_test_topic
          consumer:
            concurrency: 2

With such config, messages consumed correctly, but health check has status as down:

{
  "status": "DOWN",
  "components": {
    "binders": {
      "status": "DOWN",
      "components": {
        "kafka": {
          "status": "DOWN",
          "details": {
            "error": "org.apache.kafka.common.KafkaException: Failed to construct kafka consumer"
          }
        }
      }
    }, …

As I understand, enable.auto.commit: true is not recommended for Spring Cloud Stream.

Option #2

spring:
  cloud:
    function:
      definition: testEvents
    stream:
      kafka:
        binder:
          brokers: xxx
          configuration:
            auto.offset.reset: latest
      bindings:
        testEvents-in-0:
          destination: test_topic
          group: ${spring.application.name}_test_topic
          consumer:
            concurrency: 2
            ack-mode: TIME
            ack-time: 2000

And still, such config property ack-mode: TIME didn't help, as we see offset commit on each consumed message.

I use maven dependency spring-cloud-starter-stream-kafka version 3.0.12.RELEASE

Upvotes: 0

Views: 2952

Answers (2)

Vasyl Sarzhynskyi
Vasyl Sarzhynskyi

Reputation: 3955

Gary Russell's answer is absolutly correct, I just want to add another similar alternative by using spring.kafka.listener properties and KafkaProperties (from package org.springframework.boot.autoconfigure.kafka):

spring:
  cloud:
    function:
      definition: testEvents
    stream:
      kafka:
        binder:
          brokers: xxx
          configuration:
            auto.offset.reset: latest
      bindings:
        testEvents-in-0:
          destination: test_topic
          group: ${spring.application.name}_test_topic
          consumer:
            concurrency: 2
  kafka:
    listener:
      ack-mode: TIME
      ack-time: 1000

and the following customizer:

@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(KafkaProperties kafkaProperties) {
    return (container, destinationName, group) -> {
        var listener = kafkaProperties.getListener();
        var containerProperties = container.getContainerProperties();
        containerProperties.setAckMode(listener.getAckMode());
        containerProperties.setAckTime(listener.getAckTime().toMillis());
        containerProperties.setLogContainerConfig(true);
    };
}

Upvotes: 0

Gary Russell
Gary Russell

Reputation: 174554

First of all, ack-mode is a kafka-specific consumer binding property, you have it in the common properties.

spring:
  cloud:
    function:
      definition: testEvents
    stream:
      kafka:
        binder:
          brokers: localhost:9092
          configuration:
            auto.offset.reset: latest
        bindings:
          testEvents-in-0:
            consumer:
              ack-mode: TIME
      bindings:
        testEvents-in-0:
          destination: test_topic
          group: ${spring.application.name}_test_topic
          consumer:
            concurrency: 2

Second, there is no ack-time property, you have to set it on the container via a container customizer:

@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> cust() {
    return (container, dest, group) -> {
        container.getContainerProperties().setAckTime(2000L);
        container.getContainerProperties().setLogContainerConfig(true);
    };
}

Upvotes: 1

Related Questions