Reputation: 141
I am using BroadcastState
to perform streaming computation in Flink. I have defined a class extending KeyedBroadcastProcessFunction
for my job. Say I have a stream A which is keyed by (user_id, location)
, and a stream B, which is broadcasted to all executors to process elements in A using my defined class. I understand I can registered a timer in processBroadcastElement
or processElement
in this class so that when it times out, I can delete the associated state for a specific key group by calling state.clear()
. I wonder after that, does this key group still exist?
For example, in stream A, a new message comes with (user_id=1, location='usa')
and we have such key group and its associated states generated. After that if another message with (user_id=1, location='usa')
comes, it will trigger processElement()
and emit result.
Say after 24 hours, I'm no longer interested with this key group (user_id=1, location='usa')
, I can register a timer to clear the associated state, but I have no control of this key group. As a result, after 24 hours, when another message with (user_id=1, location='usa')
comes, since this key group still exists, processElement()
will still be invoked. As the job runs, although their associated states will be cleared after 24 hours, will key groups accumulate or that should not be a concern for memory usage?
Relevant blogs: https://www.da-platform.com/blog/a-practical-guide-to-broadcast-state-in-apache-flink
Upvotes: 1
Views: 1800
Reputation: 43707
Flink's keyed state is organized as a distributed (or sharded) key-value store, where the keys can be simple things, like integers and strings, or composites, like (user_id=1, location='usa'). Key groups are something different than composite keys. A key group is a runtime construct that was introduced in Flink 1.2 (see FLINK-3755) to permit efficient rescaling of key-value state. A key group is a subset of the key space, and is checkpointed as an independent unit. At runtime, all of the keys in the same key group are partitioned together in job graph -- each subtask has the key-value state for one or more complete key groups. This design doc gives more details. As a user working with the DataStream API, key groups are an implementation detail, and not something you work with directly.
As for timers in a KeyedBroadcastProcessFunction
, they can be registered in the processElement
or onTimer
method, but not in the processBroadcastElement
method. This is because timers are always associated with a key, and there is no key associated with a broadcast element. You can, however, manipulate any or all of the keyed state during your processBroadcastElement
method by using the applyToKeyedState
method on the KeyedBroadcastProcessFunction.Context
object. See the docs for more details.
Once you call state.clear(), the state entry for that key is deleted. New stream events for that key may, of course, arrive after the state has been cleared, and you are able to once again store value state for that key, if you wish. In order to avoid unbounded memory usage due to keeping state for no-longer-relevant keys, you do need to be careful. You might want some logic like this to expire the state 24 hours after each time it is created:
processElement:
if state.value() is null, register timer
state.update(...)
onTimer:
state.clear()
Or you might need more complex logic that extends the lifetime of the state whenever it is updated or accessed.
Another option would be to use the state time-to-live feature.
Update:
Whenever you are in a processElement
or onTimer
method of any of the ProcessFunction types, there is a specific key implicitly in context, and anything done to keyed state (such as .update()
or .clear()
) will only affect the state for that one key.
Broadcast state works differently. Broadcast state is always MapState, and is replicated into all of the parallel subtasks. Broadcast state is keyless -- if you read broadcast state during the processElement
method you will see the same value for the broadcast state regardless of what key is in context during that call.
Only in the processBroadcastElement
method of a KeyedBroadcastProcessFunction
can you modify (or clear) broadcast state, and it's important that whatever modifications (or deletions) occur be done in the same way in all of the parallel instances. This is designed this way so as to guarantee that every parallel instance will have the same contents in broadcast state. Ignoring this rule will lead to inconsistencies in the state, which can be very difficult to debug. See the docs for more info.
So yes, if you call .clear() on the broadcast state, then all of the broadcast state for all keys will be removed. Or you might remove a specific item from the broadcast state (remember, broadcast state is MapState), in which case that specific item will be removed for all keys.
There are several examples of working with broadcast state in the Flink training site. See
Upvotes: 1