Reputation: 2305
I am coding a Google Dataflow job that reads JSON, similar to below, from a Pub/Sub Topic.
{
"viewing_id": 1000,
"viewing_created_at": "2022-12-01 14:30:00 UTC"
}
During the transformation, I set the window timestamp as follows. During the JSON reading step, my element has timestamp set to the unix timestamp int from the viewing_created_at datetime.
| 'Timestamp' >> beam.Map(lambda viewing: beam.window.TimestampedValue(viewing, viewing.timestamp))
All of the above works, but I am not sure what to set for the windowing for my use-case and I find the documentation a bit hard to digest. https://beam.apache.org/documentation/programming-guide/#event-time-triggers
I want to trigger the emitting of results after the window ends, plus a delay of 5 minutes. The script that publishes to the topic triggers immediately after creating of a viewing, so there may be a lag of a few seconds between publishing and then reading by dataflow. So I want to wait 5min and then process data. I do not care about anything that comes afterwards.
This is what I have so far...
| beam.WindowInto(
window.FixedWindows(15),
trigger = AfterWatermark(),
allowed_lateness = 0,
accumulation_mode = AccumulationMode.DISCARDING
)
I read about AfterWatermark.pastEndOfWindow
but was not sure how to implement that nor how to set a delay.
To be clear, I only want to trigger once per window. I want to wait a certain time and aggregate results collected so far and ignore anything else.
Upvotes: 0
Views: 849
Reputation: 1412
To be clear, I only want to trigger once per window. I want to wait a certain time and aggregate results collected so far and ignore anything else.
The following code is for 15 minute fixed window and late data allowed upto 5 minutes post end of window.
It is triggered once per window for early and on-time data, and after watermark for any late data, it will be triggered for event count of 1.
Anything beyond 5 minute late horizon will be discarded.
| "WindowBy15Minutes" >> beam.WindowInto(beam.window.FixedWindows(60*15), # 15 minutes
trigger=Repeatedly(AfterWatermark(late=AfterCount(1))),
allowed_lateness=(60*5), # 5 minutes
accumulation_mode=AccumulationMode.DISCARDING)
EDIT:
Default trigger behavior is, it fires once when watermark has passed the end of the Window. In this case, the late data are simply dropped.
trigger=Repeatedly(AfterWatermark())
To handle late data, we introduce lateness horizon, it is defined using allowed_lateness
. In your case, allowing late data upto 5 minutes - which means, each window is open for 5 minutes, after the watermark has passed the end of the window.
trigger=Repeatedly(AfterWatermark(late=AfterCount(1))),
allowed_lateness=(60*5))
We added AfterCount(1)
to get speculative result as soon as some data received without waiting for entire window time, also you can specify processing time as well, like AfterProcessingTime()
, you can skip that, as you need once per Window.
Upvotes: 1