Michael Kniffen
Michael Kniffen

Reputation: 1

Conceptual question about MapState behavior in Apache Flink

I am trying to understand conceptually the behavior of MapState, the official docs seem to conflict with the behavior I am seeing in my deployment.

In a Keyed stream, I expect a function with a MapState will have different contents for each stream key of the element it is processing. However, I am seeing a single Map shared across an entire key group, e.g. the whole worker.

I need help reconciling the behavior we are seeing with how the docs assert that MapState works.

For Example We have a KeyedProcessFunction with a MapState initialized like:


@Transient
var cumulativeSpendMap: MapState<Int, Long>? = null // Key int is the day of the month

override fun open(parameters: Configuration?) {
  val mapStateDescriptor = MapStateDescriptor(
      "ProcessSomeEvent.cumulativeSpend",
      BasicTypeInfo.INT_TYPE_INFO,
      BasicTypeInfo.LONG_TYPE_INFO
  )
  cumulativeSpendMap = runtimeContext.getMapState(mapStateDescriptor)
}

The docs say that:

The key is automatically supplied by the system, so the function always sees the value mapped to the key of the current element

but in my experiments the Map is shared across all keys on a single worker, meaning it belongs to the entire KeyGroup. And different stream keys are free to overwrite each other's values.

Will that mean a single map is shared between all elements processed on a given TaskManager instance, regardless of the element's key?

More specifically, do I need to explicitly include the stream key as part of the key to the values stored in the Map?

What am I misunderstanding here?

Upvotes: 0

Views: 806

Answers (1)

David Anderson
David Anderson

Reputation: 43499

It's an unfortunately easy to get lost in the terminology here. Hopefully I can clear this up.

Flink offers three types of keyed state (sometimes referred to as key-partitioned state):

  • ValueState
  • ListState
  • MapState

These each maintain state in a key-value store that is indexed by the keys of the KeyedStream being processed. In other words,

  • ValueState is a map from keys to values
  • ListState is a map from keys to lists
  • MapState is a map from keys to maps

In each case, the outer map is a distributed, sharded map, with each task slot managing the shard for the key groups assigned to that slot. In the case of MapState, the values in the key-value store are themselves maps (using some other, application-specific, set of keys).

In your case, I believe ValueState is what you want to be using. For more on this, including an example, see the tutorial in the Flink docs.

Upvotes: 2

Related Questions