Reputation: 9744
Is it possible to compose/chain together multiple aggregations with different windowing and triggering in Apache Beam.
Example:
I have an input of PCollection<KV<String, Long>>
, and I need to compute 2 sums: 1 min fixed windows, and 1 hour sliding windows every 1 min, and I want to get speculative results every minute.
Trigger:
Trigger trigger =
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
// Speculative every ONE_MINUTE
.plusDelayOf(ONE_MINUTE))
// final result past watermark
.orFinally(AfterWatermark.pastEndOfWindow());
Having an input: PCollection<KV<String, Long>> input
I can do it with 2 aggregations:
PCollection<KV<String, Long>> oneMinSum = input
.apply(Window.into(1 min).triggering(trigger))
.apply(Sum.longsPerKey())
and
PCollection<KV<String, Long>> slidingSum = input
.apply(Window.into(1 hour sliding 1 min).triggering(trigger))
.apply(Sum.longsPerKey())
But in this case second aggregation will be doing sums over exactly the same data that was already used in one-min sums, if I can use oneMinSum
as an input to sliding aggregation, I would save a ton of CPU, but this doesn't work, second aggregation computes sum over early triggered panes and final pane, double counting and producing incorrect sliding sum.
Full test case is here: https://gist.github.com/anonymous/2920e870a02abcbec51e10c3fd293236
Output
key=a value=1
window=[2017-01-01T00:00:00.000Z..2017-01-01T00:10:00.000Z)
pane=PaneInfo{isFirst=true, timing=EARLY, index=0}
key=a value=5
window=[2017-01-01T00:00:00.000Z..2017-01-01T00:10:00.000Z)
pane=PaneInfo{timing=EARLY, index=1}
key=a value=7
window=[2017-01-01T00:00:00.000Z..2017-01-01T00:10:00.000Z)
pane=PaneInfo{isLast=true, timing=ON_TIME, index=2, onTimeIndex=0}
All examples that I've seen assumes that Window.into
applied to PCollection
only once, and after aggregates computed, results go to some storage (e.g. BigQuery, etc...), I've never seen any example of "chaining" aggregations and changing windowing multiple times.
Is it correct use case for Beam programming model? Or Beam programming model assumes that Window.into(...).triggering(...)
will be specified only once?
Upvotes: 3
Views: 1061
Reputation: 1901
TLDR: to address the issue, use discardingFiredPanes
in the first aggregation.
The trigger you specified is different from what you think.
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(ONE_MINUTE))
triggers 1 min after the first element in the pane, but because you also use accumulatingFiredPanes()
and Repeatedly.forever(...)
, the trigger will fire
(i) 1 min after the first element for the first time
(ii) every time when there's a new element coming after (i) is satisfied
For your test case, I annotated what happens to oneMinSum
.advanceWatermarkTo(t0)
.addElements(KV.of("a", 1))
.advanceProcessingTime(TWO_MINUTES) // emit (a, 1) because of (i)
.addElements(KV.of("a", 1))
.advanceProcessingTime(TWO_MINUTES) // emit (a, 2) because of (ii)
.advanceWatermarkTo(t1) // emit another (a, 2) past watermark
.addElements(KV.of("a", 1))
.advanceProcessingTime(TWO_MINUTES) // emit (a, 1) because of (i)
.advanceWatermarkToInfinity(); // emit another (a, 1) past watermark
The double emission of (a, 2)
and (a, 1)
is explained here.
For onHourSums
, you get
key=a value=1
because of early triggerkey=a value=5 (1+2+2)
because of early triggerkey=a value=7 (1+2+2+1+1)
because of window closingUpvotes: 2