Michael
Michael

Reputation: 3356

Flink capped MapState

Background

We want to keep in a Flink operator's state the last n unique id's. When the n+1 unique id arrives, we want to keep it and drop the oldest unique id in the state. This is in order to avoid an ever-growing state.

We already have a TTL (expiration time) mechanism in place. The size limit is another restriction we're looking to put in place.

Not every element holds a unique id.

Question

Does Flink provide an API that limits the number of elements in the state?

Things tried

  1. Using MapState with a StateTtlConfig generated TTL/expiration mechanism.
  2. Window limited the number of processed elements, but not the number of elements in state.

Upvotes: 1

Views: 498

Answers (3)

Yoav R.
Yoav R.

Reputation: 531

I think you can create this functionality yourself by implementing sort of "Flink adjusted" version of Java's LinkedHashMap

You have to remember that MapState is not exactly a classic Java Map, but an elaborate implementation of it that enables State management.

Essentially - I think you can implement such a thing using a combiniation of MapState and ListState, following some guidelines on how to implement this - something along this example: Java tips and tricks.: Queue Map Hybrid -- Creating Data Structures in Java (techtipsjava.blogspot.com)

Regarding TTL - I am not sure how would key removal (because it was timed-out) would affect such structure, this has to be carefully thought of.

Upvotes: 1

David Anderson
David Anderson

Reputation: 43499

While this isn't directly provided, you could achieve this with MapState<Long, Event> plus a couple of additional ValueState<Long> values to keep track of the currently active range of indexes into the MapState.

As events arrive, do something roughly like this (but using Flink state rather than this pseudocode):

map[nextIndex++] = thisEvent;
if (nextIndex - oldestIndex > n) {
  map[oldestIndex++].clear();
}

Upvotes: 1

Joud C
Joud C

Reputation: 437

I don't think Flink has a state type that would support this out of the box. The closest thing I can think of is to use ListState. With ListState you can append elements like you would a regular list.

For your use case, you would read the state, call .get() which would give you an iterable that you can iterate on, removing the item you'd like to drop and then pushing the state back.

From a performance perspective, the iteration may not be ideal but on the other hand, it would not be significant in comparison to accessing state from disk (in case you're using RocksDB as a state backend) which incurs a heavy cost due to serialization and deserialization.

Upvotes: 1

Related Questions