Reputation: 129
I have a compact topic with approx 30Mio Keys.
My App
materializes this topic to a KeyValueStore
.
How can I check if the KeyValueStore
is completely populated? If I lookup a key via InteractiveQuery
I need to know if the key is not present because the StateStore
is not ready yet or if the key is indeed not present.
I materialize the StateStore this way:
@Bean
public Consumer<KTable<Key, Value>> process() {
return stream -> stream.filter((k, v) -> v != null,
Materialized.<Key, Value, KeyValueStore<Bytes, byte[]>>as("stateStore")
.withKeySerde(new KeySerde())
.withValueSerde(new ValueSerde()));
}
Upvotes: 4
Views: 1394
Reputation: 1366
Updated : I misunderstood OP's question from "how to check if the Topology has finished materialized the input topic to state store" to "state store restore process"
You can only get KeyValueStore from your KafkaStreams instance when the KafkaStreams' state has changed from REBALANCING
to RUNNING
state.
You can check this this state transition using a StreamsBuilderFactoryBeanCustomizer
to access the underlying KafkaStreams instance.
If you just want to check when all state store have been fully populated and when kafka stream thread is ready so you can get a KeyValueStore
the you can listen on StateListener
:
@Bean
public StreamsBuilderFactoryBeanCustomizer onKafkaStateChangeFromRebalanceToRunning() {
return factoryBean -> factoryBean.setStateListener((newState, oldState) -> {
if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
// set flag that `stateStore` store of current KafkaStreams has been fully restore
// then you can get
}
}
}
or if you want to get the store from KafkaStreams
instance
@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return factoryBean -> factoryBean.setKafkaStreamsCustomizer((KafkaStreamsCustomizer) kafkaStreams -> {
kafkaStreams.setStateListener((newState, oldState) -> {
if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
//get and assign your store using kafkaStreams.store("stateStore", QueryableStoreTypes.keyValueStore());
//and set flag that `stateStore` store of current KafkaStreams has been fully restore
}
});
});
}
Note that there should be only one instance of StreamsBuilderFactoryBeanCustomizer.
Upvotes: -1
Reputation: 62330
In general, there is no such thing as "fully loaded" because after the application was started at any point in time new data might be written to the input topic and this new data would be read to update the corresponding table.
What you can do is to monitor consumer lag: within you application KafkaStreams#metrics()
allow you to access all client (ie, consumer/producer) and Kafka Streams metrics. The consumer exposes a metric called records-lag-max
that may help.
Of course, during normal processing (assuming that new data is written to the input topic all the time) consumer lag will go up-and-down all the time.
Upvotes: 4