Reputation: 1
I am learning the concept of Windowing and different kinds of Triggers and late data. I am following the tutorials from apache beam playground and here is the link I am using to learn this. https://play.beam.apache.org/ Python SDK and topic is WindowAccumulationMode.
I have changed their generate_event.py input TestStreams as follow:
import apache_beam as beam
from apache_beam.testing.test_stream import TestStream
from datetime import datetime
import pytz
class GenerateEvent(beam.PTransform):
@staticmethod
def sample_data():
return GenerateEvent()
def expand(self, input):
return (input
| TestStream()
.add_elements(elements=["A"], event_timestamp=datetime(2021, 3, 1, 0, 0, 1, 0, tzinfo=pytz.UTC).timestamp())
.add_elements(elements=["B"], event_timestamp=datetime(2021, 3, 1, 0, 0, 2, 0, tzinfo=pytz.UTC).timestamp())
.add_elements(elements=["C"], event_timestamp=datetime(2021, 3, 1, 0, 0, 3, 0, tzinfo=pytz.UTC).timestamp())
.add_elements(elements=["D"], event_timestamp=datetime(2021, 3, 1, 0, 0, 4, 0, tzinfo=pytz.UTC).timestamp())
.advance_watermark_to(datetime(2021, 3, 1, 0, 0, 5, 0, tzinfo=pytz.UTC).timestamp())
.add_elements(elements=["E"], event_timestamp=datetime(2021, 3, 1, 0, 0, 5, 0, tzinfo=pytz.UTC).timestamp())
.add_elements(elements=["F"], event_timestamp=datetime(2021, 3, 1, 0, 0, 6, 0, tzinfo=pytz.UTC).timestamp())
.add_elements(elements=["G"], event_timestamp=datetime(2021, 3, 1, 0, 0, 7, 0, tzinfo=pytz.UTC).timestamp())
.add_elements(elements=["H"], event_timestamp=datetime(2021, 3, 1, 0, 0, 8, 0, tzinfo=pytz.UTC).timestamp())
.add_elements(elements=["I"], event_timestamp=datetime(2021, 3, 1, 0, 0, 9, 0, tzinfo=pytz.UTC).timestamp())
.advance_watermark_to(datetime(2021, 3, 1, 0, 0, 10, 0, tzinfo=pytz.UTC).timestamp())
.add_elements(elements=["J"], event_timestamp=datetime(2021, 3, 1, 0, 0, 10, 0, tzinfo=pytz.UTC).timestamp())
.add_elements(elements=["K"], event_timestamp=datetime(2021, 3, 1, 0, 0, 11, 0, tzinfo=pytz.UTC).timestamp())
.add_elements(elements=["L"], event_timestamp=datetime(2021, 3, 1, 0, 0, 12, 0, tzinfo=pytz.UTC).timestamp())
.add_elements(elements=["M"], event_timestamp=datetime(2021, 3, 1, 0, 0, 13, 0, tzinfo=pytz.UTC).timestamp())
.add_elements(elements=["N"], event_timestamp=datetime(2021, 3, 1, 0, 0, 14, 0, tzinfo=pytz.UTC).timestamp())
.advance_watermark_to(datetime(2021, 3, 1, 0, 0, 15, 0, tzinfo=pytz.UTC).timestamp())
.add_elements(elements=["O"], event_timestamp=datetime(2021, 3, 1, 0, 0, 15, 0, tzinfo=pytz.UTC).timestamp())
.add_elements(elements=["P"], event_timestamp=datetime(2021, 3, 1, 0, 0, 16, 0, tzinfo=pytz.UTC).timestamp())
.add_elements(elements=["Q"], event_timestamp=datetime(2021, 3, 1, 0, 0, 17, 0, tzinfo=pytz.UTC).timestamp())
.add_elements(elements=["R"], event_timestamp=datetime(2021, 3, 1, 0, 0, 18, 0, tzinfo=pytz.UTC).timestamp())
.add_elements(elements=["S"], event_timestamp=datetime(2021, 3, 1, 0, 0, 19, 0, tzinfo=pytz.UTC).timestamp())
.advance_watermark_to(datetime(2021, 3, 1, 0, 0, 20, 0, tzinfo=pytz.UTC).timestamp())
.add_elements(elements=["T"], event_timestamp=datetime(2021, 3, 1, 0, 0, 20, 0, tzinfo=pytz.UTC).timestamp())
.advance_watermark_to(datetime(2021, 3, 1, 0, 0, 25, 0, tzinfo=pytz.UTC).timestamp())
.advance_watermark_to_infinity())
and the beam code is below:
import apache_beam as beam
from generate_event import GenerateEvent
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.trigger import AfterWatermark
from apache_beam.transforms.trigger import AfterCount
from apache_beam.transforms.trigger import AccumulationMode
from apache_beam.utils.timestamp import Duration
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.transforms.util import LogElements
class CountEventsWithAccumulating(beam.PTransform):
def expand(self, events):
return (events
| beam.Map(lambda x: ('key',x) )
| beam.WindowInto(FixedWindows(5),
trigger=AfterWatermark(),
accumulation_mode=AccumulationMode.ACCUMULATING,
allowed_lateness=Duration(seconds=0))
| beam.GroupByKey()
)
options = PipelineOptions(allow_unsafe_triggers=True)
options.view_as(StandardOptions).streaming = True
with beam.Pipeline(options=options) as p:
(p | GenerateEvent.sample_data()
| CountEventsWithAccumulating()
| LogElements(with_window=True))
The processing has been started
('key', ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I']), window(start=2021-03-01T00:00:00Z, end=2021-03-01T00:00:10Z)
('key', ['J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S']), window(start=2021-03-01T00:00:10Z, end=2021-03-01T00:00:20Z)
('key', ['T']), window(start=2021-03-01T00:00:20Z, end=2021-03-01T00:00:30Z)
Now, as far as I know, Window will trigger result, when watermark reaches. Now, if you see, first watermark is at 5 sec, so first pane should have [A,B,C,D] only, Although, Fixed Window is of 10 seconds, but, watermark arrives, so it should only emmit A,B,C,D and other ['E', 'F', 'G', 'H', 'I'] should be dropped, because , we advance the watermark, before E,F,G,H,I arrives in stream.
Can anyone explain?
Upvotes: 0
Views: 25