amarjeet kushwaha
amarjeet kushwaha

Reputation: 1

Apache Beam Not emitting proper results based on Windows and Trigger Strategy. Can anyone explain

I am learning the concept of Windowing and different kinds of Triggers and late data. I am following the tutorials from apache beam playground and here is the link I am using to learn this. https://play.beam.apache.org/ Python SDK and topic is WindowAccumulationMode.

I have changed their generate_event.py input TestStreams as follow:


import apache_beam as beam
from apache_beam.testing.test_stream import TestStream
from datetime import datetime
import pytz


class GenerateEvent(beam.PTransform):

    @staticmethod
    def sample_data():
        return GenerateEvent()

    def expand(self, input):

        return (input
                | TestStream()
                    .add_elements(elements=["A"], event_timestamp=datetime(2021, 3, 1, 0, 0, 1, 0, tzinfo=pytz.UTC).timestamp())
                    .add_elements(elements=["B"], event_timestamp=datetime(2021, 3, 1, 0, 0, 2, 0, tzinfo=pytz.UTC).timestamp())
                    .add_elements(elements=["C"], event_timestamp=datetime(2021, 3, 1, 0, 0, 3, 0, tzinfo=pytz.UTC).timestamp())
                    .add_elements(elements=["D"], event_timestamp=datetime(2021, 3, 1, 0, 0, 4, 0, tzinfo=pytz.UTC).timestamp())
                    .advance_watermark_to(datetime(2021, 3, 1, 0, 0, 5, 0, tzinfo=pytz.UTC).timestamp())
                    .add_elements(elements=["E"], event_timestamp=datetime(2021, 3, 1, 0, 0, 5, 0, tzinfo=pytz.UTC).timestamp())
                    .add_elements(elements=["F"], event_timestamp=datetime(2021, 3, 1, 0, 0, 6, 0, tzinfo=pytz.UTC).timestamp())
                    .add_elements(elements=["G"], event_timestamp=datetime(2021, 3, 1, 0, 0, 7, 0, tzinfo=pytz.UTC).timestamp())
                    .add_elements(elements=["H"], event_timestamp=datetime(2021, 3, 1, 0, 0, 8, 0, tzinfo=pytz.UTC).timestamp())
                    .add_elements(elements=["I"], event_timestamp=datetime(2021, 3, 1, 0, 0, 9, 0, tzinfo=pytz.UTC).timestamp())
                    .advance_watermark_to(datetime(2021, 3, 1, 0, 0, 10, 0, tzinfo=pytz.UTC).timestamp())
                    .add_elements(elements=["J"], event_timestamp=datetime(2021, 3, 1, 0, 0, 10, 0, tzinfo=pytz.UTC).timestamp())
                    .add_elements(elements=["K"], event_timestamp=datetime(2021, 3, 1, 0, 0, 11, 0, tzinfo=pytz.UTC).timestamp())
                    .add_elements(elements=["L"], event_timestamp=datetime(2021, 3, 1, 0, 0, 12, 0, tzinfo=pytz.UTC).timestamp())
                    .add_elements(elements=["M"], event_timestamp=datetime(2021, 3, 1, 0, 0, 13, 0, tzinfo=pytz.UTC).timestamp())
                    .add_elements(elements=["N"], event_timestamp=datetime(2021, 3, 1, 0, 0, 14, 0, tzinfo=pytz.UTC).timestamp())
                    .advance_watermark_to(datetime(2021, 3, 1, 0, 0, 15, 0, tzinfo=pytz.UTC).timestamp())
                    .add_elements(elements=["O"], event_timestamp=datetime(2021, 3, 1, 0, 0, 15, 0, tzinfo=pytz.UTC).timestamp())
                    .add_elements(elements=["P"], event_timestamp=datetime(2021, 3, 1, 0, 0, 16, 0, tzinfo=pytz.UTC).timestamp())
                    .add_elements(elements=["Q"], event_timestamp=datetime(2021, 3, 1, 0, 0, 17, 0, tzinfo=pytz.UTC).timestamp())
                    .add_elements(elements=["R"], event_timestamp=datetime(2021, 3, 1, 0, 0, 18, 0, tzinfo=pytz.UTC).timestamp())
                    .add_elements(elements=["S"], event_timestamp=datetime(2021, 3, 1, 0, 0, 19, 0, tzinfo=pytz.UTC).timestamp())
                    .advance_watermark_to(datetime(2021, 3, 1, 0, 0, 20, 0, tzinfo=pytz.UTC).timestamp())
                    .add_elements(elements=["T"], event_timestamp=datetime(2021, 3, 1, 0, 0, 20, 0, tzinfo=pytz.UTC).timestamp())
                    .advance_watermark_to(datetime(2021, 3, 1, 0, 0, 25, 0, tzinfo=pytz.UTC).timestamp())
                    .advance_watermark_to_infinity())

and the beam code is below:



import apache_beam as beam
from generate_event import GenerateEvent
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.trigger import AfterWatermark
from apache_beam.transforms.trigger import AfterCount
from apache_beam.transforms.trigger import AccumulationMode
from apache_beam.utils.timestamp import Duration
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.transforms.util import LogElements


class CountEventsWithAccumulating(beam.PTransform):
  def expand(self, events):
    return (events
            | beam.Map(lambda x: ('key',x)  )  
            | beam.WindowInto(FixedWindows(5), 
                              trigger=AfterWatermark(),
                              accumulation_mode=AccumulationMode.ACCUMULATING,
                              allowed_lateness=Duration(seconds=0))
              | beam.GroupByKey()

                  )


options = PipelineOptions(allow_unsafe_triggers=True)
options.view_as(StandardOptions).streaming = True
with beam.Pipeline(options=options) as p:
  (p | GenerateEvent.sample_data()
   | CountEventsWithAccumulating()
   | LogElements(with_window=True))

Now, when I run this , I get below output:

The processing has been started
('key', ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I']), window(start=2021-03-01T00:00:00Z, end=2021-03-01T00:00:10Z)
('key', ['J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S']), window(start=2021-03-01T00:00:10Z, end=2021-03-01T00:00:20Z)
('key', ['T']), window(start=2021-03-01T00:00:20Z, end=2021-03-01T00:00:30Z)

Now, as far as I know, Window will trigger result, when watermark reaches. Now, if you see, first watermark is at 5 sec, so first pane should have [A,B,C,D] only, Although, Fixed Window is of 10 seconds, but, watermark arrives, so it should only emmit A,B,C,D and other ['E', 'F', 'G', 'H', 'I'] should be dropped, because , we advance the watermark, before E,F,G,H,I arrives in stream.

Can anyone explain?

Upvotes: 0

Views: 25

Answers (0)

Related Questions