Reputation: 121
I would like to create a unit test for a streaming pipeline with a early triggering. The example pipeline looks like this:
class CalculateTeamScores(beam.PTransform):
def expand(self, scores):
return scores \
| "windowing scores" >> beam.WindowInto(
beam.window.FixedWindows(120),
trigger=AfterWatermark(early=AfterCount(1)),
accumulation_mode=AccumulationMode.ACCUMULATING,
allowed_lateness=Duration(seconds=3600)) \
| "preparing scores for combining" >> beam.Map(
lambda team_score: (team_score['team'], team_score['score'])) \
| "calculating team scores" >> beam.CombinePerKey(sum) \
| "forming the result" >> beam.ParDo(FormatResult())
I have written the following test:
class TestCalculateTeamScores(TestCase):
def test_should_sum_score_for_each_team(self):
# given
p = TestPipeline()
scores_stream = p | "loading score stream" >> TestStream()\
.advance_processing_time(advance_by=timestamp_from_datetime('12:30:00', as_int=True))\
.advance_watermark_to(new_watermark=timestamp_from_datetime('12:00:00'))\
.add_elements(elements=[team_score('red', 5)],
event_timestamp=timestamp_from_datetime('12:00:30'))\
.advance_processing_time(advance_by=500)\
.add_elements(elements=[team_score('red', 9)],
event_timestamp=timestamp_from_datetime('12:01:50'))\
# when
result = scores_stream | 'calculating team scores' >> CalculateTeamScores()
# then
assert_that(result, equal_to([
{
'team': 'red',
'score': 5,
'eventTime': (timestamp_from_datetime('12:00:00'),
timestamp_from_datetime('12:02:00'))
},
{
'team': 'red',
'score': 14,
'eventTime': (timestamp_from_datetime('12:00:00'),
timestamp_from_datetime('12:02:00'))
},
]))
p.run()
As you can see, I expect to get an early pane with a score = 5. Unfortunately, I can't get the result I want. I get 2 panes with a score = 14.
I use the TestStream
class, that is not documented and is not a part of the public interface, but it looks like something that suits my case.
Upvotes: 4
Views: 546
Reputation: 121
I couldn't get the early pane because of the invalid TestPipeline
setup. It should be:
options = PipelineOptions()
options.view_as(StandardOptions).streaming = True
p = TestPipeline(options=options)
Looks like the TestStream
works fine for testing early triggering.
Upvotes: 2