Reputation: 273
I have a Spring Boot application, which consumes messages from Apache Kafka through Apache Camel
public void init() throws Exception {
DefaultCamelContext camelContext = new DefaultCamelContext();
camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() {
from("kafka:destination?brokers=<host>:9092&maxBlockMs=5000&reconnectBackoffMaxMs=2000")
.process(senderProcessor);
}
});
camelContext.start();
}
It works if the context is already started when messages arrive. But if messages were sent to Kafka before the application was started, the app doesn't consume those messages. I'm sure there must be the way to get them. Is it true?
Upvotes: 1
Views: 847
Reputation: 7025
There is a Kafka consumer setting called auto.offset.reset
and its default value is latest
.
In Camel-Kafka the setting is named autoOffsetReset
.
That means, that the consumer ignores all existing messages and only consumes new ones. If you set it to earliest
, the consumer consumes all existing messages.
Notice that this only matters when the consumer connects for the first time. Once it is connected and consumes messages, it should commit the offset back to the broker and use this saved offset whenever it reconnects.
Upvotes: 2