Patze
Patze

Reputation: 307

How to use Flink streaming timeWindow with timestamp and watermark assigners?

I'm working on a Flink streaming processor that reads events from Kafka. Those events are keyed by one of there fields and should be windowed over a period of time before being reduced and outputted. My processor uses event time as time characteristic and therefore reads the timestamp from the events it consumes. Here's what it currently looks like:

source
    .map(new MapEvent())
    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(10)) {
        @Override public long extractTimestamp(Event event) {
                return event.getTimestamp();
            }
        })
    .keyBy(new KeySelector())
    .timeWindow(Time.minutes(1))
    .reduce(new EventReducer())
    .map(new MapToResult());

What I know about the events is the following:

  1. They are unordered in regards to their event time.
  2. Late arrivals are possible, thus events might arrive significantly later than there timestamp says. For ease of use, let's say I know, that the latest possible arrival would be 20 seconds.
  3. I want my activities to be windowed for exactly one minute before Flink forwards them into the following reduce operator.

And finally, here are my questions:

  1. Given my previously described use-case, is the BoundedOutOfOrdernessTimestampExtractor a good choice? I've read my way through the docs and saw the AssignerWithPunctuatedWatermarks and other predefined assigners available for the watermark creation, but didn't understand completely, if those would be better for me.
  2. How does the assignTimestampsAndWatermarks() play together with the timeWindow() method? Can they interfere when it comes to late arrivals? Is there anything I have to keep in that area that I have to keep in mind?

Upvotes: 4

Views: 2432

Answers (2)

ZeMi
ZeMi

Reputation: 45

Maybe your watermark is always smaller than the window-endtime, so that it won’t trigger the window to yield results.Points of how to trigger the window are below:

  1. watermark>=window-endtime.
  2. there are some elements in this window.

Upvotes: 1

Dawid Wysakowicz
Dawid Wysakowicz

Reputation: 3422

I think we should start with the watermark concept. Briefly speaking watermark says that the majority of events with earlier timestamps already arrived. Based on that assumption the timeWindow can emit window when watermark passes the end of window. Of course there still may happen late arrivals, which one may want to handle. Here comes the concept of allowedLateness which specifies how long after emitting the window should we keep track of elements that were there so that we could for example update our sink with those late events(but one must remember the window was already emitted without this element). Hope this somehow answers your seconds question.

Coming back to your first question if you have many events that can be late by 20 seconds I think the BoundedOutOfOrdernessTimestampExtractor is the best choice. This way though emitting every window will be delayed by those 20 seconds. If the late arrivals are rather sporadic and you can handle duplicates then you may think about another one.

The AssignerWithPunctuatedWatermarks you mentioned, as the doc says should be used in case some particular events in your stream already act as a watermark. So don't think it suits your use-case.

For more info on watermark you may read this doc or this and that

Upvotes: 4

Related Questions