Reputation: 3356
We want to keep in a Flink operator's state the last n
unique id's.
When the n+1
unique id arrives, we want to keep it and drop the oldest unique id in the state. This is in order to avoid an ever-growing state.
We already have a TTL (expiration time) mechanism in place. The size limit is another restriction we're looking to put in place.
Not every element holds a unique id.
Does Flink provide an API that limits the number of elements in the state?
MapState
with a StateTtlConfig
generated TTL/expiration mechanism.Upvotes: 1
Views: 498
Reputation: 531
I think you can create this functionality yourself by implementing sort of "Flink adjusted" version of Java's LinkedHashMap
You have to remember that MapState is not exactly a classic Java Map, but an elaborate implementation of it that enables State management.
Essentially - I think you can implement such a thing using a combiniation of MapState
and ListState
, following some guidelines on how to implement this - something along this example: Java tips and tricks.: Queue Map Hybrid -- Creating Data Structures in Java (techtipsjava.blogspot.com)
Regarding TTL - I am not sure how would key removal (because it was timed-out) would affect such structure, this has to be carefully thought of.
Upvotes: 1
Reputation: 43499
While this isn't directly provided, you could achieve this with MapState<Long, Event>
plus a couple of additional ValueState<Long>
values to keep track of the currently active range of indexes into the MapState.
As events arrive, do something roughly like this (but using Flink state rather than this pseudocode):
map[nextIndex++] = thisEvent;
if (nextIndex - oldestIndex > n) {
map[oldestIndex++].clear();
}
Upvotes: 1
Reputation: 437
I don't think Flink has a state type that would support this out of the box. The closest thing I can think of is to use ListState. With ListState you can append elements like you would a regular list.
For your use case, you would read the state, call .get()
which would give you an iterable that you can iterate on, removing the item you'd like to drop and then pushing the state back.
From a performance perspective, the iteration may not be ideal but on the other hand, it would not be significant in comparison to accessing state from disk (in case you're using RocksDB as a state backend) which incurs a heavy cost due to serialization and deserialization.
Upvotes: 1