Reputation: 37034
KafkaProperties
java doc:
/**
* What to do when there is no initial offset in Kafka or if the current offset
* does not exist any more on the server.
*/
private String autoOffsetReset;
I have hello world appllication which contains application.properties
spring.kafka.consumer.group-id=foo
spring.kafka.consumer.auto-offset-reset=latest
At this case @KafkaListener
method is invoked for all entries. But expected result was that @KafkaListener
method is invoked only for latest 3 options I send. I tried to use another option:
spring.kafka.consumer.auto-offset-reset=earlisest
But behaviour the same.
Can you explain this stuff?
code sample:
@SpringBootApplication
public class Application implements CommandLineRunner {
public static Logger logger = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args).close();
}
@Autowired
private KafkaTemplate<String, String> template;
private final CountDownLatch latch = new CountDownLatch(3);
@Override
public void run(String... args) throws Exception {
this.template.send("spring_kafka_topic", "foo1");
this.template.send("spring_kafka_topic", "foo2");
this.template.send("spring_kafka_topic", "foo3");
latch.await(60, TimeUnit.SECONDS);
logger.info("All received");
}
@KafkaListener(topics = "spring_kafka_topic")
public void listen(ConsumerRecord<?, ?> cr) throws Exception {
logger.info(cr.toString());
latch.countDown();
}
}
Behaviour doesn't depends on
spring.kafka.consumer.auto-offset-reset
it is only depends on spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit
if I set spring.kafka.consumer.enable-auto-commit=false
- I see all records.
if I set spring.kafka.consumer.enable-auto-commit=true
- I see only 3 last records.
Please clarify menaning of spring.kafka.consumer.auto-offset-reset
property
Upvotes: 5
Views: 32120
Reputation: 121262
The KafkaProperties
in Spring Boot does this:
public Map<String, Object> buildProperties() {
Map<String, Object> properties = new HashMap<String, Object>();
if (this.autoCommitInterval != null) {
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
this.autoCommitInterval);
}
if (this.autoOffsetReset != null) {
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
this.autoOffsetReset);
}
This buildProperties()
is used from the buildConsumerProperties()
which, in turn in the:
@Bean
@ConditionalOnMissingBean(ConsumerFactory.class)
public ConsumerFactory<?, ?> kafkaConsumerFactory() {
return new DefaultKafkaConsumerFactory<Object, Object>(
this.properties.buildConsumerProperties());
}
So, if you use your own ConsumerFactory
bean definition be sure to reuse those KafkaProperties
: https://docs.spring.io/spring-boot/docs/1.5.7.RELEASE/reference/htmlsingle/#boot-features-kafka-extra-props
UPDATE
OK. I see what's going on.
Try to add this property:
spring.kafka.consumer.enable-auto-commit=false
This way we won't have async auto-commits based on some commit interval.
The logic in our application is based on the exit fact after the latch.await(60, TimeUnit.SECONDS);
. When we get 3
expected records we exit. This way the async auto-commit from the consumer might not happen yet. So, the next time you run the application the consumer polls data from the uncommited offset.
When we turn off auto-commit, we have an AckMode.BATCH
, which is performed synchronously and we have an ability to see really latest recodrs in the topic for this foo
consumer group.
Upvotes: 2