How to unit test early triggering in Apache Beam (Python SDK)

I would like to create a unit test for a streaming pipeline with a early triggering. The example pipeline looks like this:

class CalculateTeamScores(beam.PTransform):
    def expand(self, scores):
        return scores \
           | "windowing scores" >> beam.WindowInto(
                beam.window.FixedWindows(120),
                trigger=AfterWatermark(early=AfterCount(1)),
                accumulation_mode=AccumulationMode.ACCUMULATING,
                allowed_lateness=Duration(seconds=3600)) \
           | "preparing scores for combining" >> beam.Map(
                lambda team_score: (team_score['team'], team_score['score'])) \
           | "calculating team scores" >> beam.CombinePerKey(sum) \
           | "forming the result" >> beam.ParDo(FormatResult())

I have written the following test:

class TestCalculateTeamScores(TestCase):

def test_should_sum_score_for_each_team(self):
    # given
    p = TestPipeline()

    scores_stream = p | "loading score stream" >> TestStream()\
        .advance_processing_time(advance_by=timestamp_from_datetime('12:30:00', as_int=True))\
        .advance_watermark_to(new_watermark=timestamp_from_datetime('12:00:00'))\
        .add_elements(elements=[team_score('red', 5)],
                      event_timestamp=timestamp_from_datetime('12:00:30'))\
        .advance_processing_time(advance_by=500)\
        .add_elements(elements=[team_score('red', 9)],
                      event_timestamp=timestamp_from_datetime('12:01:50'))\

    # when
    result = scores_stream | 'calculating team scores' >> CalculateTeamScores()

    # then
    assert_that(result, equal_to([
        {
            'team': 'red',
            'score': 5,
            'eventTime': (timestamp_from_datetime('12:00:00'),
                          timestamp_from_datetime('12:02:00'))
        },
        {
            'team': 'red',
            'score': 14,
            'eventTime': (timestamp_from_datetime('12:00:00'),
                          timestamp_from_datetime('12:02:00'))
        },
    ]))
    p.run()

As you can see, I expect to get an early pane with a score = 5. Unfortunately, I can't get the result I want. I get 2 panes with a score = 14.

I use the TestStream class, that is not documented and is not a part of the public interface, but it looks like something that suits my case.

Upvotes: 4

Views: 546

Answers (1)

I couldn't get the early pane because of the invalid TestPipeline setup. It should be:

options = PipelineOptions()
options.view_as(StandardOptions).streaming = True
p = TestPipeline(options=options)

Looks like the TestStream works fine for testing early triggering.

Upvotes: 2

Related Questions