Reputation: 986
I am trying to write tests for late and out-of-order data in an Apache Beam 2.35.0 pipeline written using the Python SDK. I'm following the Testing Unbounded Pipelines in Apache Beam blog post and trying to map the Java examples into Python by reading the Python code.
Here is my first attempt. The pipeline is extracting the time from the message and rewriting the times on the messages. The comments explain my understanding of what's going on:
import apache_beam as beam
import pandas as pd
from apache_beam.testing import test_stream
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
def test_out_of_order_data():
stream = test_stream.TestStream()
# First element arrives 15 after its "data" time.
stream.advance_watermark_to(ts("12:00:45"))
stream.add_elements([{"time": ts("12:00:30"), "value": 1}])
# Next element 5 seconds after its data time and advances output watermark past the previous window.
stream.advance_watermark_to(ts("13:01:05"))
stream.add_elements([{"time": ts("13:01:01"), "value": 2}])
# Late element from the first window arrives.
stream.advance_watermark_to(ts("13:01:06"))
stream.add_elements([{"time": ts("12:00:31"), "value": 3}])
stream.advance_watermark_to_infinity()
with TestPipeline() as p:
output = (
p
| "stream" >> stream
| "rewrite timestamps" >> beam.Map(lambda e: beam.window.TimestampedValue(e, e["time"]))
| "window" >> beam.WindowInto(beam.window.FixedWindows(60))
| "add dummy key" >> beam.Map(lambda elem: (None, elem))
| "group" >> beam.GroupByKey()
| "remove dummy key" >> beam.Map(lambda elem: elem[1])
| "sum" >> beam.Map(lambda elems: sum([e["value"] for e in elems]))
)
# I expected the output watermark to be advanced by the data, the late element to be discarded,
# and `output` to be [1, 2]. It is not. This test passes.
assert_that(output, equal_to([4, 2]))
def ts(time_str):
return pd.Timestamp(f"2000-01-01 {time_str}").timestamp()
That did not behave as I expected. The late message at 13:01:06 watermark time still contributes to the 12:00 window.
The Java example has some assertions that reference windows directly. I thought that might be what I'm missing above, so I tried to write a minimal test using that:
def test_window_simple_with_global_false():
stream = test_stream.TestStream()
stream.advance_watermark_to(ts("12:00:30"))
stream.add_elements([1])
stream.advance_watermark_to_infinity()
with TestPipeline() as p:
output = (
p
| "stream" >> stream
| "window" >> beam.WindowInto(beam.window.FixedWindows(60))
| "add dummy key" >> beam.Map(lambda elem: (None, elem))
| "group" >> beam.GroupByKey()
| "remove dummy key" >> beam.Map(lambda elem: elem[1])
)
assert_that(
output,
equal_to({beam.window.IntervalWindow(ts("12:00"), ts("12:01")): [1]}),
use_global_window=False,
)
I tried some variations of this (removing the "add dummy key/group/remove dummy key" steps, adding reify_windows=True
), but it looks like the matcher is expecting a window and receiving a list:
ERROR apache_beam.runners.direct.executor:executor.py:379 Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7f44cd345400>, due to an exception.
Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1198, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 718, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 843, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "/home/aschmied/code/cme-injestion/cme-real-time/.tox/unit/lib/python3.8/site-packages/apache_beam/transforms/core.py", line 1636, in <lambda>
wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
File "/home/aschmied/code/cme-injestion/cme-real-time/.tox/unit/lib/python3.8/site-packages/apache_beam/testing/util.py", line 164, in _equal
sorted_actual = sorted(actual)
File "/home/aschmied/code/cme-injestion/cme-real-time/.tox/unit/lib/python3.8/site-packages/apache_beam/transforms/window.py", line 232, in __lt__
if self.end != other.end:
AttributeError: 'list' object has no attribute 'end'
Any help greatly appreciated.
Upvotes: 1
Views: 953
Reputation: 5104
For your first question, there's no guarantee that "late" data will be dropped, rather the runner is allowed to drop data later than this. (In practice, in a distributed runner, whether data is late and by how much is often a matter of non-deterministic timing.) I agree that it'd be nicer if the local runner enforced this though.
As for testing with reify_windows=True
, I've found it much easier to do something like
assert_that(
actual | beam.Map(lambda x, w=beam.DoFn.WindowParam: (x, w))
equal_to([(some_value, some_window)])
)
Upvotes: 1