Eugene Zhulenev
Eugene Zhulenev

Reputation: 9744

Early triggering and chained aggregations in Apache Beam

Is it possible to compose/chain together multiple aggregations with different windowing and triggering in Apache Beam.

Example:

I have an input of PCollection<KV<String, Long>>, and I need to compute 2 sums: 1 min fixed windows, and 1 hour sliding windows every 1 min, and I want to get speculative results every minute.

Trigger:

Trigger trigger =
    Repeatedly.forever(
            AfterProcessingTime.pastFirstElementInPane()
                // Speculative every ONE_MINUTE
                .plusDelayOf(ONE_MINUTE))
        // final result past watermark
        .orFinally(AfterWatermark.pastEndOfWindow());

Having an input: PCollection<KV<String, Long>> input I can do it with 2 aggregations:

PCollection<KV<String, Long>> oneMinSum = input
    .apply(Window.into(1 min).triggering(trigger))
    .apply(Sum.longsPerKey())

and

PCollection<KV<String, Long>> slidingSum = input
    .apply(Window.into(1 hour sliding 1 min).triggering(trigger))
    .apply(Sum.longsPerKey())

But in this case second aggregation will be doing sums over exactly the same data that was already used in one-min sums, if I can use oneMinSum as an input to sliding aggregation, I would save a ton of CPU, but this doesn't work, second aggregation computes sum over early triggered panes and final pane, double counting and producing incorrect sliding sum.

Full test case is here: https://gist.github.com/anonymous/2920e870a02abcbec51e10c3fd293236

Output

key=a value=1
    window=[2017-01-01T00:00:00.000Z..2017-01-01T00:10:00.000Z)
    pane=PaneInfo{isFirst=true, timing=EARLY, index=0}
key=a value=5
    window=[2017-01-01T00:00:00.000Z..2017-01-01T00:10:00.000Z)
    pane=PaneInfo{timing=EARLY, index=1}
key=a value=7
    window=[2017-01-01T00:00:00.000Z..2017-01-01T00:10:00.000Z)
    pane=PaneInfo{isLast=true, timing=ON_TIME, index=2, onTimeIndex=0}

All examples that I've seen assumes that Window.into applied to PCollection only once, and after aggregates computed, results go to some storage (e.g. BigQuery, etc...), I've never seen any example of "chaining" aggregations and changing windowing multiple times.

Is it correct use case for Beam programming model? Or Beam programming model assumes that Window.into(...).triggering(...) will be specified only once?

Upvotes: 3

Views: 1061

Answers (1)

Jiayuan Ma
Jiayuan Ma

Reputation: 1901

TLDR: to address the issue, use discardingFiredPanes in the first aggregation.

The trigger you specified is different from what you think. AfterProcessingTime.pastFirstElementInPane().plusDelayOf(ONE_MINUTE)) triggers 1 min after the first element in the pane, but because you also use accumulatingFiredPanes() and Repeatedly.forever(...), the trigger will fire

(i) 1 min after the first element for the first time

(ii) every time when there's a new element coming after (i) is satisfied

For your test case, I annotated what happens to oneMinSum

.advanceWatermarkTo(t0)
.addElements(KV.of("a", 1))
.advanceProcessingTime(TWO_MINUTES) // emit (a, 1) because of (i)
.addElements(KV.of("a", 1))
.advanceProcessingTime(TWO_MINUTES) // emit (a, 2) because of (ii)
.advanceWatermarkTo(t1)             // emit another (a, 2) past watermark
.addElements(KV.of("a", 1))
.advanceProcessingTime(TWO_MINUTES) // emit (a, 1) because of (i)
.advanceWatermarkToInfinity();      // emit another (a, 1) past watermark

The double emission of (a, 2) and (a, 1) is explained here.

For onHourSums, you get

  • key=a value=1 because of early trigger
  • key=a value=5 (1+2+2) because of early trigger
  • key=a value=7 (1+2+2+1+1) because of window closing

Upvotes: 2

Related Questions