Reputation: 344
I have a MapState where the keys are Instants (with nano precision), and in my use case I need to retrieve all of the keys that are within a period.
My code to day is to iterate all keys and filter out those that I don't need, so I was thinking I could keep a cache with all the keys I've saved so I can issue get
calls instead of using keys
, since my backend is RocksDB.
The question is: to make sure that the cache reflects the state, or at least doesn't miss anything, is it enough to initialize it in the open
function and then have it write-through to the backend? I need to be 100% sure that if a key exists in the state is for sure in the cache.
Upvotes: 0
Views: 680
Reputation: 43499
You have to somehow handle the case when open
is called during recovery. I suppose you might mark the cache as invalid during open
, and then each time you use the cache, first call keys
to update the cache if it is invalid.
The reason this is a bit tricky is that MapState is a kind of key-partitioned state (where the key in this context is the key of the KeyedStream, rather than a key in a MapState's key/value store), and there's a separate MapState for each distinct key in the KeyedStream. When open
is called, you can't access any of the MapState instances (let alone call the keys
method on them), nor do you have any information about which of the KeyedStream's keys are in the key groups assigned to the instance being opened. This may result in the cache being somewhat awkward to work with, as each instance of the cache will have to handle all of the entries from all of the MapStates present in that instance of the operator.
Hopefully binary sorted state will be ready for Flink 1.17; it should make this sort of caching unnecessary. See the slides from our Flink Forward talk for more information.
Upvotes: 1