Reputation: 125
Apache Beam has recently introduced state cells, through StateSpec
and the @StateId
annotation, with partial support in Apache Flink and Google Cloud Dataflow.
I cannot find any documentation on what happens when this is used with a GlobalWindow
. In particular, is there a way to have a "state garbage collection" mechanism to get rid of states for keys that have not been seen for a while according to some configuration, while still maintaining a single all-time state for keys are that seen frequently enough?
Or, is the amount of state used in this case going to diverge, with no way to ever reclaim state corresponding to keys that have not been seen in a while?
I am also interested in whether a potential solution would be supported in either Apache Flink or Google Cloud Dataflow.
Flink and direct runners seem to have some code for "state GC" but I am not really sure what it does and whether it is relevant when using a global window.
Upvotes: 3
Views: 1633
Reputation: 6033
State can be automatically garbage collected by a Beam runner at some point after a window expires - when the input watermark exceeds the end of the window by the allowed lateness, so all further input is droppable. The exact details depend on the runner.
As you correctly determined, the Global window may never expire. Then this automatic collection of state will not be invoked. For bounded data, including drain scenarios, it actually will expire, but for a perpetual unbounded data source it will not.
If you are doing stateful processing on such data in the Global window you can use user-defined timers (used through @TimerId
, @OnTimer
, and TimerSpec
- I haven't blogged about these yet) to clear state after some timeout of your choosing. If the state represents an aggregation of some sort, then you'll want a timer anyhow to make sure your data is not stranded in state.
Here is a quick example of their use:
new DoFn<Foo, Baz>() {
private static final String MY_TIMER = "my-timer";
private static final String MY_STATE = "my-state";
@StateId(MY_STATE)
private final StateSpec<ValueState<Bizzle>> =
StateSpec.value(Bizzle.coder());
@TimerId(MY_TIMER)
private final TimerSpec myTimer =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
@ProcessElement
public void process(
ProcessContext c,
@StateId(MY_STATE) ValueState<Bizzle> bizzleState,
@TimerId(MY_TIMER) Timer myTimer) {
bizzleState.write(...);
myTimer.setForNowPlus(...);
}
@OnTimer(MY_TIMER)
public void onMyTimer(
OnTimerContext context,
@StateId(MY_STATE) ValueState<Bizzle> bizzleState) {
context.output(... bizzleState.read() ...);
bizzleState.clear();
}
}
Upvotes: 5
Reputation: 986
There is not automatic garbage collection of state if you use GlobalWindows
. Only if you use some non-global window will state be garbage collected after the watermark passes the end of a window plus the allowed lateness.
What you can do if you must work with GlobalWindows
is to manually keep as state the last update timestamp
. Then you would periodically set a timer where you check this timestamp against the current time and delete state if necessary. You would set this timer when encountering a key for the first time (which you can see from the absence of your timestamp state) and then re-set it in the @OnTimer
method.
Upvotes: 1