Frank
Frank

Reputation: 129

Check if StateStore is fully populated

I have a compact topic with approx 30Mio Keys. My App materializes this topic to a KeyValueStore.

How can I check if the KeyValueStore is completely populated? If I lookup a key via InteractiveQuery I need to know if the key is not present because the StateStore is not ready yet or if the key is indeed not present.

I materialize the StateStore this way:


  @Bean
  public Consumer<KTable<Key, Value>> process() {
    return stream -> stream.filter((k, v) -> v != null,
        Materialized.<Key, Value, KeyValueStore<Bytes, byte[]>>as("stateStore")
            .withKeySerde(new KeySerde())
            .withValueSerde(new ValueSerde()));
  }

Upvotes: 4

Views: 1394

Answers (2)

Tuyen Luong
Tuyen Luong

Reputation: 1366

Updated : I misunderstood OP's question from "how to check if the Topology has finished materialized the input topic to state store" to "state store restore process"

You can only get KeyValueStore from your KafkaStreams instance when the KafkaStreams' state has changed from REBALANCING to RUNNING state. You can check this this state transition using a StreamsBuilderFactoryBeanCustomizer to access the underlying KafkaStreams instance. If you just want to check when all state store have been fully populated and when kafka stream thread is ready so you can get a KeyValueStore the you can listen on StateListener:

@Bean
public StreamsBuilderFactoryBeanCustomizer onKafkaStateChangeFromRebalanceToRunning() {
    return factoryBean -> factoryBean.setStateListener((newState, oldState) -> {
        if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
            // set flag that `stateStore` store of current KafkaStreams has been fully restore
            // then you can get
        }
    }
}

or if you want to get the store from KafkaStreams instance

@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
    return factoryBean -> factoryBean.setKafkaStreamsCustomizer((KafkaStreamsCustomizer) kafkaStreams -> {
        kafkaStreams.setStateListener((newState, oldState) -> {
            if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
                //get and assign your store using kafkaStreams.store("stateStore", QueryableStoreTypes.keyValueStore());
                //and set flag that `stateStore` store of current KafkaStreams has been fully restore
            }
        });
    });
}

Read more in the docs.

Note that there should be only one instance of StreamsBuilderFactoryBeanCustomizer.

Upvotes: -1

Matthias J. Sax
Matthias J. Sax

Reputation: 62330

In general, there is no such thing as "fully loaded" because after the application was started at any point in time new data might be written to the input topic and this new data would be read to update the corresponding table.

What you can do is to monitor consumer lag: within you application KafkaStreams#metrics() allow you to access all client (ie, consumer/producer) and Kafka Streams metrics. The consumer exposes a metric called records-lag-max that may help.

Of course, during normal processing (assuming that new data is written to the input topic all the time) consumer lag will go up-and-down all the time.

Upvotes: 4

Related Questions