salvob
salvob

Reputation: 1370

apache beam (python SDK): Late (or early) events discarded and triggers. How to know how many discarded and why?

I have a streaming pipeline connected with a PubSub subscription (with around 2MLN elements every hour. I need to collect them in a group and then extract some information.

def expand(self, pcoll):
    return (
            pcoll |

            beam.WindowInto(FixedWindows(10),
                            trigger=AfterAny(AfterCount(2000), AfterProcessingTime(30)),
                            allowed_lateness=10,
                            trigger=AfterAny(
                                AfterCount(8000),
                                AfterProcessingTime(30),
                                AfterWatermark(
                                    early=AfterProcessingTime(60),
                                    late=AfterProcessingTime(60)
                                )
                            ),
                            allowed_lateness=60 * 60 * 24,
                            accumulation_mode=AccumulationMode.DISCARDING)
| "Group by Key" >> beam.GroupByKey()

I try my best to NOT miss any data. But I found out that I have around 4% missing data. As you can see in the code I trigger anytime I hit 8k elements or every 30 seconds. Allowing lateness 1 day, and it should trigger both if the pipeline is analyzing early or late events.

Still missing those 4% though. So, is there a way to know if the pipeline is discarding some data? How many elements? For which reason?

Thank you so much in advance

Upvotes: 0

Views: 1076

Answers (1)

Iñigo
Iñigo

Reputation: 2670

First, I see you have two triggers in the sample code, I assume this is a typo, though.

It looks you are dropping elements due to no using Repeatedly, so all elements after the first trigger get lost. There's an official doc on this from Beam.

Allow me to post an example:


test_stream = (TestStream()
               .add_elements([
                    TimestampedValue('in_time_1', 0),
                    TimestampedValue('in_time_2', 0)])
               .advance_watermark_to(9)
               .advance_processing_time(9)
               .add_elements([TimestampedValue('late_but_in_window', 8)])
               .advance_watermark_to(10)
               .advance_processing_time(10)
               .add_elements([TimestampedValue('in_time_window2', 12)])
               .advance_watermark_to(20)  # Past window time
               .advance_processing_time(20)
               .add_elements([TimestampedValue('late_window_closed', 9),
                              TimestampedValue('in_time_window2_2', 12)])
               .advance_watermark_to_infinity())


class RecordFn(beam.DoFn):
    def process(
        self,
        element=beam.DoFn.ElementParam,
        timestamp=beam.DoFn.TimestampParam):

        yield ("key", (element, timestamp))



options = PipelineOptions()
options.view_as(StandardOptions).streaming = True

with TestPipeline(options=options) as p:
    records = (p | test_stream
                 | beam.ParDo(RecordFn())
                 | beam.WindowInto(FixedWindows(10),
                                   allowed_lateness=0,
                                   # trigger=trigger.AfterCount(1),
                                   trigger=trigger.Repeatedly(trigger.AfterCount(1)),
                                   accumulation_mode=trigger.AccumulationMode.DISCARDING)
                 | beam.GroupByKey()
                 | beam.Map(print)
               )

If we have trigger trigger.Repeatedly(trigger.AfterCount(1)), all elements are fired as they come, with no dropped element (but late_window_closed which is expected as it was late):

('key', [('in_time_1', Timestamp(0)), ('in_time_2', Timestamp(0))])  # this two are together since they arrived together
('key', [('late_but_in_window', Timestamp(8))])
('key', [('in_time_window2', Timestamp(12))])
('key', [('in_time_window2_2', Timestamp(12))])

If we use trigger.AfterCount(1) (no repeatedly), we only get the first elements that arrived in the pipeline:

('key', [('in_time_1', Timestamp(0)), ('in_time_2', Timestamp(0))])
('key', [('in_time_window2', Timestamp(12))])

Note that both in_time_(1,2) appear in the first fired pane because the arrived at the same time (0), were one of them appear later it would have been dropped.

Upvotes: 5

Related Questions