gstackoverflow
gstackoverflow

Reputation: 37034

How does spring.kafka.consumer.auto-offset-reset works in spring-kafka

KafkaProperties java doc:

/**
  * What to do when there is no initial offset in Kafka or if the current offset
  * does not exist any more on the server.
  */
private String autoOffsetReset;

I have hello world appllication which contains application.properties

spring.kafka.consumer.group-id=foo
spring.kafka.consumer.auto-offset-reset=latest

At this case @KafkaListener method is invoked for all entries. But expected result was that @KafkaListener method is invoked only for latest 3 options I send. I tried to use another option:

spring.kafka.consumer.auto-offset-reset=earlisest

But behaviour the same.

Can you explain this stuff?

P.S.

code sample:

@SpringBootApplication
public class Application implements CommandLineRunner {

    public static Logger logger = LoggerFactory.getLogger(Application.class);

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args).close();
    }

    @Autowired
    private KafkaTemplate<String, String> template;

    private final CountDownLatch latch = new CountDownLatch(3);

    @Override
    public void run(String... args) throws Exception {
        this.template.send("spring_kafka_topic", "foo1");
        this.template.send("spring_kafka_topic", "foo2");
        this.template.send("spring_kafka_topic", "foo3");
        latch.await(60, TimeUnit.SECONDS);
        logger.info("All received");
    }

    @KafkaListener(topics = "spring_kafka_topic")
    public void listen(ConsumerRecord<?, ?> cr) throws Exception {
        logger.info(cr.toString());
        latch.countDown();
    }
}

Update:

Behaviour doesn't depends on
spring.kafka.consumer.auto-offset-reset

it is only depends on spring.kafka.consumer.auto-offset-reset=earliest

spring.kafka.consumer.enable-auto-commit

if I set spring.kafka.consumer.enable-auto-commit=false - I see all records.

if I set spring.kafka.consumer.enable-auto-commit=true - I see only 3 last records.

Please clarify menaning of spring.kafka.consumer.auto-offset-reset property

Upvotes: 5

Views: 32120

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121262

The KafkaProperties in Spring Boot does this:

public Map<String, Object> buildProperties() {
        Map<String, Object> properties = new HashMap<String, Object>();
        if (this.autoCommitInterval != null) {
            properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
                    this.autoCommitInterval);
        }
        if (this.autoOffsetReset != null) {
            properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
                    this.autoOffsetReset);
        }

This buildProperties() is used from the buildConsumerProperties() which, in turn in the:

@Bean
@ConditionalOnMissingBean(ConsumerFactory.class)
public ConsumerFactory<?, ?> kafkaConsumerFactory() {
    return new DefaultKafkaConsumerFactory<Object, Object>(
            this.properties.buildConsumerProperties());
}

So, if you use your own ConsumerFactory bean definition be sure to reuse those KafkaProperties: https://docs.spring.io/spring-boot/docs/1.5.7.RELEASE/reference/htmlsingle/#boot-features-kafka-extra-props

UPDATE

OK. I see what's going on.

Try to add this property:

spring.kafka.consumer.enable-auto-commit=false

This way we won't have async auto-commits based on some commit interval.

The logic in our application is based on the exit fact after the latch.await(60, TimeUnit.SECONDS);. When we get 3 expected records we exit. This way the async auto-commit from the consumer might not happen yet. So, the next time you run the application the consumer polls data from the uncommited offset.

When we turn off auto-commit, we have an AckMode.BATCH, which is performed synchronously and we have an ability to see really latest recodrs in the topic for this foo consumer group.

Upvotes: 2

Related Questions