Reputation: 423
I trying to save the Kafka offset into file i use Spring boot it seems the offset is writing in the file but not reading so the fact camel will start reading from the beginning of the kafka topic at restart
@Component
public class Route extends RouteBuilder {
@Override
public void configure() throws Exception {
from(kafka())
.to("log:TEST?level=INFO")
.process(Route::commitKafka);
}
private String kafka() {
String kafkaEndpoint = "kafka:";
kafkaEndpoint += "topic";
kafkaEndpoint += "?brokers=";
kafkaEndpoint += "localhost:9092";
kafkaEndpoint += "&groupId=";
kafkaEndpoint += "TEST";
kafkaEndpoint += "&autoOffsetReset=";
kafkaEndpoint += "earliest";
kafkaEndpoint += "&autoCommitEnable=";
kafkaEndpoint += false;
kafkaEndpoint += "&allowManualCommit=";
kafkaEndpoint += true;
kafkaEndpoint += "&offsetRepository=";
kafkaEndpoint += "#fileStore";
return kafkaEndpoint;
}
@Bean(name = "fileStore")
private FileStateRepository fileStateRepository() {
FileStateRepository fileStateRepository = FileStateRepository.fileStateRepository(new File("/kafka/offset_repo/repo.dat"));
// This will be empty
// System.out.println(fileStateRepository.getCache());
return fileStateRepository;
}
private static void commitKafka(Exchange exchange) {
KafkaManualCommit manual = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
manual.commitSync();
}
}
Upvotes: 0
Views: 474
Reputation: 423
I finally found a solution but it does not appear in the documentation the start method must be called to init the repo at startup
@Bean(name = "fileStore")
private FileStateRepository fileStateRepository() {
FileStateRepository fileStateRepository = FileStateRepository.fileStateRepository(new File("/kafka/offset_repo/repo.dat"));
try {
fileStateRepository.start();
} catch (Exception e) {
e.printStackTrace();
}
return fileStateRepository;
}
Upvotes: 2