Reputation: 1370
I have a streaming pipeline connected with a PubSub subscription (with around 2MLN elements every hour. I need to collect them in a group and then extract some information.
def expand(self, pcoll):
return (
pcoll |
beam.WindowInto(FixedWindows(10),
trigger=AfterAny(AfterCount(2000), AfterProcessingTime(30)),
allowed_lateness=10,
trigger=AfterAny(
AfterCount(8000),
AfterProcessingTime(30),
AfterWatermark(
early=AfterProcessingTime(60),
late=AfterProcessingTime(60)
)
),
allowed_lateness=60 * 60 * 24,
accumulation_mode=AccumulationMode.DISCARDING)
| "Group by Key" >> beam.GroupByKey()
I try my best to NOT miss any data. But I found out that I have around 4% missing data. As you can see in the code I trigger anytime I hit 8k elements or every 30 seconds. Allowing lateness 1 day, and it should trigger both if the pipeline is analyzing early or late events.
Still missing those 4% though. So, is there a way to know if the pipeline is discarding some data? How many elements? For which reason?
Thank you so much in advance
Upvotes: 0
Views: 1076
Reputation: 2670
First, I see you have two triggers in the sample code, I assume this is a typo, though.
It looks you are dropping elements due to no using Repeatedly
, so all elements after the first trigger get lost. There's an official doc on this from Beam.
Allow me to post an example:
test_stream = (TestStream()
.add_elements([
TimestampedValue('in_time_1', 0),
TimestampedValue('in_time_2', 0)])
.advance_watermark_to(9)
.advance_processing_time(9)
.add_elements([TimestampedValue('late_but_in_window', 8)])
.advance_watermark_to(10)
.advance_processing_time(10)
.add_elements([TimestampedValue('in_time_window2', 12)])
.advance_watermark_to(20) # Past window time
.advance_processing_time(20)
.add_elements([TimestampedValue('late_window_closed', 9),
TimestampedValue('in_time_window2_2', 12)])
.advance_watermark_to_infinity())
class RecordFn(beam.DoFn):
def process(
self,
element=beam.DoFn.ElementParam,
timestamp=beam.DoFn.TimestampParam):
yield ("key", (element, timestamp))
options = PipelineOptions()
options.view_as(StandardOptions).streaming = True
with TestPipeline(options=options) as p:
records = (p | test_stream
| beam.ParDo(RecordFn())
| beam.WindowInto(FixedWindows(10),
allowed_lateness=0,
# trigger=trigger.AfterCount(1),
trigger=trigger.Repeatedly(trigger.AfterCount(1)),
accumulation_mode=trigger.AccumulationMode.DISCARDING)
| beam.GroupByKey()
| beam.Map(print)
)
If we have trigger trigger.Repeatedly(trigger.AfterCount(1))
, all elements are fired as they come, with no dropped element (but late_window_closed
which is expected as it was late):
('key', [('in_time_1', Timestamp(0)), ('in_time_2', Timestamp(0))]) # this two are together since they arrived together
('key', [('late_but_in_window', Timestamp(8))])
('key', [('in_time_window2', Timestamp(12))])
('key', [('in_time_window2_2', Timestamp(12))])
If we use trigger.AfterCount(1)
(no repeatedly), we only get the first elements that arrived in the pipeline:
('key', [('in_time_1', Timestamp(0)), ('in_time_2', Timestamp(0))])
('key', [('in_time_window2', Timestamp(12))])
Note that both in_time_(1,2)
appear in the first fired pane because the arrived at the same time (0), were one of them appear later it would have been dropped.
Upvotes: 5