Shahid Thaika
Shahid Thaika

Reputation: 2305

Apache Beam: What trigger do I need for my use case

I am coding a Google Dataflow job that reads JSON, similar to below, from a Pub/Sub Topic.

{
    "viewing_id": 1000,
    "viewing_created_at": "2022-12-01 14:30:00 UTC"
}

During the transformation, I set the window timestamp as follows. During the JSON reading step, my element has timestamp set to the unix timestamp int from the viewing_created_at datetime.

| 'Timestamp' >> beam.Map(lambda viewing: beam.window.TimestampedValue(viewing, viewing.timestamp))

All of the above works, but I am not sure what to set for the windowing for my use-case and I find the documentation a bit hard to digest. https://beam.apache.org/documentation/programming-guide/#event-time-triggers

I want to trigger the emitting of results after the window ends, plus a delay of 5 minutes. The script that publishes to the topic triggers immediately after creating of a viewing, so there may be a lag of a few seconds between publishing and then reading by dataflow. So I want to wait 5min and then process data. I do not care about anything that comes afterwards.

This is what I have so far...

    | beam.WindowInto(
                        window.FixedWindows(15),
                        trigger = AfterWatermark(),
                        allowed_lateness = 0,
                        accumulation_mode = AccumulationMode.DISCARDING
                    )

I read about AfterWatermark.pastEndOfWindow but was not sure how to implement that nor how to set a delay.

To be clear, I only want to trigger once per window. I want to wait a certain time and aggregate results collected so far and ignore anything else.

Upvotes: 0

Views: 849

Answers (1)

Rathish Kumar B
Rathish Kumar B

Reputation: 1412

To be clear, I only want to trigger once per window. I want to wait a certain time and aggregate results collected so far and ignore anything else.

The following code is for 15 minute fixed window and late data allowed upto 5 minutes post end of window.

It is triggered once per window for early and on-time data, and after watermark for any late data, it will be triggered for event count of 1.

Anything beyond 5 minute late horizon will be discarded.

    | "WindowBy15Minutes" >> beam.WindowInto(beam.window.FixedWindows(60*15), # 15 minutes
                            trigger=Repeatedly(AfterWatermark(late=AfterCount(1))),
                            allowed_lateness=(60*5), # 5 minutes
                            accumulation_mode=AccumulationMode.DISCARDING)

EDIT:

Default trigger behavior is, it fires once when watermark has passed the end of the Window. In this case, the late data are simply dropped.

trigger=Repeatedly(AfterWatermark())

To handle late data, we introduce lateness horizon, it is defined using allowed_lateness. In your case, allowing late data upto 5 minutes - which means, each window is open for 5 minutes, after the watermark has passed the end of the window.

trigger=Repeatedly(AfterWatermark(late=AfterCount(1))),
                    allowed_lateness=(60*5))    

    

We added AfterCount(1) to get speculative result as soon as some data received without waiting for entire window time, also you can specify processing time as well, like AfterProcessingTime(), you can skip that, as you need once per Window.

Upvotes: 1

Related Questions