Michael
Michael

Reputation: 423

Camel kafka with Spring FileStateRepository

I trying to save the Kafka offset into file i use Spring boot it seems the offset is writing in the file but not reading so the fact camel will start reading from the beginning of the kafka topic at restart

@Component
public class Route extends RouteBuilder {
  @Override
  public void configure() throws Exception {
    from(kafka())
            .to("log:TEST?level=INFO")
            .process(Route::commitKafka);
}

private String kafka() {

    String kafkaEndpoint = "kafka:";

    kafkaEndpoint += "topic";
    kafkaEndpoint += "?brokers=";
    kafkaEndpoint += "localhost:9092";
    kafkaEndpoint += "&groupId=";
    kafkaEndpoint += "TEST";
    kafkaEndpoint += "&autoOffsetReset=";
    kafkaEndpoint += "earliest";
    kafkaEndpoint += "&autoCommitEnable=";
    kafkaEndpoint += false;
    kafkaEndpoint += "&allowManualCommit=";
    kafkaEndpoint += true;
    kafkaEndpoint += "&offsetRepository=";
    kafkaEndpoint += "#fileStore";

    return kafkaEndpoint;
}

@Bean(name = "fileStore")
private FileStateRepository fileStateRepository() {
    FileStateRepository fileStateRepository = FileStateRepository.fileStateRepository(new File("/kafka/offset_repo/repo.dat"));
     // This will be empty
     // System.out.println(fileStateRepository.getCache());
    return fileStateRepository;
}

private static void commitKafka(Exchange exchange) {
      KafkaManualCommit manual = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
      manual.commitSync();
  }
}

Upvotes: 0

Views: 474

Answers (1)

Michael
Michael

Reputation: 423

I finally found a solution but it does not appear in the documentation the start method must be called to init the repo at startup

@Bean(name = "fileStore")
private FileStateRepository fileStateRepository() {
    FileStateRepository fileStateRepository = FileStateRepository.fileStateRepository(new File("/kafka/offset_repo/repo.dat"));

    try {
        fileStateRepository.start();
    } catch (Exception e) {
        e.printStackTrace();
    }


    return fileStateRepository;
}

Upvotes: 2

Related Questions