Reputation: 1
I am trying to understand conceptually the behavior of MapState, the official docs seem to conflict with the behavior I am seeing in my deployment.
In a Keyed stream, I expect a function with a MapState will have different contents for each stream key of the element it is processing. However, I am seeing a single Map shared across an entire key group, e.g. the whole worker.
I need help reconciling the behavior we are seeing with how the docs assert that MapState works.
For Example We have a KeyedProcessFunction with a MapState initialized like:
@Transient
var cumulativeSpendMap: MapState<Int, Long>? = null // Key int is the day of the month
override fun open(parameters: Configuration?) {
val mapStateDescriptor = MapStateDescriptor(
"ProcessSomeEvent.cumulativeSpend",
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.LONG_TYPE_INFO
)
cumulativeSpendMap = runtimeContext.getMapState(mapStateDescriptor)
}
The docs say that:
The key is automatically supplied by the system, so the function always sees the value mapped to the key of the current element
but in my experiments the Map is shared across all keys on a single worker, meaning it belongs to the entire KeyGroup. And different stream keys are free to overwrite each other's values.
Will that mean a single map is shared between all elements processed on a given TaskManager instance, regardless of the element's key?
More specifically, do I need to explicitly include the stream key as part of the key to the values stored in the Map?
What am I misunderstanding here?
Upvotes: 0
Views: 806
Reputation: 43499
It's an unfortunately easy to get lost in the terminology here. Hopefully I can clear this up.
Flink offers three types of keyed state (sometimes referred to as key-partitioned state):
ValueState
ListState
MapState
These each maintain state in a key-value store that is indexed by the keys of the KeyedStream
being processed. In other words,
ValueState
is a map from keys to valuesListState
is a map from keys to listsMapState
is a map from keys to mapsIn each case, the outer map is a distributed, sharded map, with each task slot managing the shard for the key groups assigned to that slot. In the case of MapState
, the values in the key-value store are themselves maps (using some other, application-specific, set of keys).
In your case, I believe ValueState
is what you want to be using. For more on this, including an example, see the tutorial in the Flink docs.
Upvotes: 2