David_Zizu
David_Zizu

Reputation: 1039

Events being fired after end of the window when processing unbounded stream of data

Given: unbounded stream of data with some duplicate events that are located close to each (in terms of time); duplicate events have the same event timestamp and same unique id
Goal: remove duplicates from the stream

I planned to apply a fixed-size window, then group given events by a unique key and perform early fire. So I have something like the following:

(
    element
    | "Add timestamp to event" >> ParDo(AddTimestampCustomFn())    # simply return TimistampedValue
    | "Select unique key" >> Map(lambda elem: (elem.id, elem))
    | "Apply window" >> WindowInto(
        FixedWindows(50),
        trigger=AfterCount(1),
        accumulation_mod=AccumulationMode.DISCARDING,
        allowed_lateness=0,
    )
    | "Group events by id" >> GroupByKey()
    | "Print results" >> ParDo(CustomPrintFn())    # simply print the first element from the grouped elements along with timestamp
)

Everything seems to be working as expected, but I realized that even after some time has passed it's still possible to process an event that belongs to the time window which supposedly should have already been passed.

Let's say we have the following events with the corresponding timestamps: [('a', 0), ('a', 1), ('a', 40), ('b', 20), ('c', 90), ('d', '140'), ('f', 1)]. I would expect my output to be something like: [('a', 49), ('b', 49), ('c': 99), ('d', 149)]. However, in addition to the above output I also get f event. So the actual output is [('a', 49), ('b', 49), ('c': 99), ('d', 149), ('f', 49)]. It's worth noting that the timestamp of the grouped events is equal to the last time of the time window. I don't quite understand why I get f event. The window is fixed, its length is 50 seconds and allowed_lateness set to 0. I would also assume that the watermark should have already passed. Therefore, I can't really understand why event f is still there?

I also tried executing code with no group by statements. However, it seems to produce similar results. Let's we have the following data as our input: [('a', 1), ('b', 90), ('c', 140), ('f', 1)]. Then the result still includes event f: [('a', 49), ('b', 99), ('c', 149), ('f', 49)]. Here is the simplified code:

(
    element
    | "Add timestamp to event" >> ParDo(AddTimestampCustomFn())
    | "Apply window" >> WindowInto(FixedWindows(50))
    | "Print results" >> ParDo(CustomPrintFn())
)

I feel like my understanding of the fixed-window is wrong but I cannot get why.

Upvotes: 0

Views: 195

Answers (1)

danielm
danielm

Reputation: 3010

The watermark behavior depends on the source; it may not be advancing the way you think, so all of the data here is probably on-time and not dropped.

In particular, if you are using Create, the watermark starts at negative infinity, all elements are injected into the pipeline, then the watermark advances to positive infinity. This ensures that none of the data is late, regardless of the order you gave to Create.

If you want to explicitly control both element injection and the watermark for testing purposes, you can use TestStream.

Upvotes: 1

Related Questions