Josh
Josh

Reputation: 707

Stateful processing in Beam - is state shared across window panes?

Apache Beam has recently introduced state cells, through StateSpec and the @StateId annotation, with partial support in Apache Flink and Google Cloud Dataflow.

My question is about state garbage collection, in the case where a stateful DoFn is used on a windowed stream. Typically, state is removed (garbage collected) by the runner when the window expires (i.e. the watermark passes the end of the window). However, consider the case where window panes are triggered early, and the fired panes are discarded:

input.apply(Window.<MyElement>into(CalendarWindows.days(1))
  .triggering(AfterWatermark.pastEndOfWindow()
  .withEarlyFirings(
    AfterProcessingTime.pastFirstElementInPane()
      .plusDelayOf(Duration.standardMinutes(10))
  ))
  .discardingFiredPanes()
  .apply(ParDo.of(new MyStatefulDofn()));

In this case, would the state for the keys which were fired early be kept until after the window expires? i.e. would subsequent panes in the same window have access to state written by earlier panes?

Upvotes: 2

Views: 1780

Answers (1)

Kenn Knowles
Kenn Knowles

Reputation: 6033

Your triggering configuration does not affect how stateful processing of a ParDo proceeds. The elements are provided immediately to your DoFn without any buffering/triggering and your DoFn directly controls when output occurs.

The fact that you control the output is an important difference between stateful ParDo processing and Combine.perKey governed by triggers. This is why stateful ParDo is often a good choice when triggers are not rich enough for your use case.

I compare stateful ParDo processing with Combine + triggers in some more detail in my post on the Beam blog: https://beam.apache.org/blog/2017/02/13/stateful-processing.html

Now, if there is a GroupByKey or Combine.perKey somewhere upstream from your stateful ParDo, then input elements will be associated with some trigger firing from upstream. But this does not affect how the state for your stateful ParDo is managed. As state is persisted across elements, and a "pane" is just an element, state is maintained until the window expires fully.

Very nice summary leading up to your question, by the way!

Upvotes: 1

Related Questions