sharp-z
sharp-z

Reputation: 21

Apache Stream TestStream with late element

I’m trying to use the TestStream to experiment and see how late elements are handled, but got some very interesting and confusing behavior.

Specifically, I add an element "2" with timestamp within a window (windowTwo), and then move the watermark to after the end of the window but before the endOfWindow + Lateness, and finally, I add another element "3" with timestamp within the window.

Interesting and confusing thing is: I expected to see 5 for the sum of all elements in the windowTwo, but it fails and says

Expected: iterable over [<5>] in any order, but: Not matched: <2>

However, if I change the expected sum from 2 to 5, it still fails and says

Expected: iterable over [<2>] in any order, but: Not matched: <5>

What's going on???

import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.*;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

public class BeamAppTest {
    @Rule
    public final transient TestPipeline pipeline = TestPipeline.create();

    @Test
    @Category(NeedsRunner.class)
    public void testApp() {
        final Duration windowLengthMin = Duration.standardMinutes(10);
        final Duration latenessMin = Duration.standardMinutes(5);
        final Duration oneMin = Duration.standardMinutes(1);

        final Instant windowOneStart = new Instant(0L).plus(Duration.standardMinutes(20));
        final Instant windowOneEnd = windowOneStart.plus(windowLengthMin);
        final IntervalWindow windowOne = new IntervalWindow(windowOneStart, windowOneEnd);

        final Instant windowTwoStart = windowOneEnd;
        final Instant windowTwoEnd = windowTwoStart.plus(windowLengthMin);
        final IntervalWindow windowTwo = new IntervalWindow(windowTwoStart, windowTwoEnd);

        TestStream<Integer> testStream = TestStream.create(BigEndianIntegerCoder.of())
            .addElements(TimestampedValue.of(1, windowOneStart.plus(oneMin))) // early window one
            .advanceWatermarkTo(windowOneEnd)                                 // watermark passes window one
            .addElements(TimestampedValue.of(2, windowTwoStart.plus(oneMin))) // early window two
            .advanceWatermarkTo(windowTwoEnd.plus(latenessMin).minus(oneMin)) // water mark passes window two
            .addElements(TimestampedValue.of(3, windowTwoStart.plus(oneMin))) // late window two
            .advanceProcessingTime(oneMin.plus(oneMin))
            .advanceWatermarkToInfinity();

        PCollection<Integer> means = pipeline.apply(testStream).apply(new CalSum(windowLengthMin, latenessMin));

        PAssert.that(means)
            .inWindow(windowOne)
            .containsInAnyOrder(1);

        PAssert.that(means)
            .inWindow(windowTwo)
            .containsInAnyOrder(2);  // change the 2 to 5 here to see magic!!!

        pipeline.run().waitUntilFinish();
    }

    static class CalSum extends PTransform<PCollection<Integer>, PCollection<Integer>> {
        private final Duration WINDOW_LENGTH_MIN;
        private final Duration LATENESS_MIN;

        CalSum(Duration windowLengthMin, Duration latenessMin) {
            WINDOW_LENGTH_MIN = windowLengthMin;
            LATENESS_MIN = latenessMin;
        }

        @Override
        public PCollection<Integer> expand(PCollection<Integer> input) {
            return input
                .apply(Window
                    .<Integer>into(FixedWindows.of(WINDOW_LENGTH_MIN))
                    .withAllowedLateness(LATENESS_MIN)
                    .accumulatingFiredPanes()  // accumulating trigger
                    .triggering(AfterWatermark.pastEndOfWindow()  // trigger at end of window
                        .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
                            .plusDelayOf(Duration.standardMinutes(2)))  // trigger every 2 min within the window
                        .withLateFirings(AfterProcessingTime.pastFirstElementInPane()
                            .plusDelayOf(Duration.standardMinutes(1))))) // trigger every 1 min after the window
                .apply(Sum.integersGlobally().withoutDefaults());
        }
    }
}

Upvotes: 2

Views: 448

Answers (1)

Thomas Groh
Thomas Groh

Reputation: 511

As written, with the time and watermark at which elements arrive, windowTwo contains two elements: 2 and 5. This is a result of the triggering that you've set up: the input 2 arrives with a timestamp of windowTwoStart plus one minute, when the watermark is before the end of windowTwo, and as such is on time. Then, the watermark advances past the end of windowTwo, causing the AfterWatermark trigger to fire.

After this occurs, the input 3 arrives - this is after the watermark for the window it is in (so the element is late), but before the watermark has passed the end of the window plus the allowed lateness (so the element is not droppable). As a result, when the watermark advances again, the element is produced alongside the earlier 2 (due to the accumulation mode that has been chosen), where it is combined into the 5 you observe.

The on-time pane (which you can match by using PAssert.that(means).inOnTimePane(windowTwo)) contains only the value 2; over the lifetime of the window, both 2 and 5 are produced, so the inWindow assertion checks against [2, 5].

Upvotes: 1

Related Questions