kinder
kinder

Reputation: 37

how can I implement keyed window timeouts in Flink?

I have keyed events coming in on a stream that I would like to accumulate by key, up to a timeout (say, 5 minutes), and then process the events accumulated up to that point (and ignore everything after for that key, but first things first).

I am new to Flink, but conceptually I think I need something like the code below.

    DataStream<Tuple2<String, String>> dataStream = see
            .socketTextStream("localhost", 9999)
            .flatMap(new Splitter())
            .keyBy(0)
            .window(GlobalWindows.create())
            .trigger(ProcessingTimeTrigger.create()) // how do I set the timeout value?
            .fold(new Tuple2<>("", ""), new FoldFunction<Tuple2<String, String>, Tuple2<String, String>>() {
                public Tuple2<String, String> fold(Tuple2<String, String> agg, Tuple2<String, String> elem) {
                    if ( agg.f0.isEmpty()) {
                        agg.f0 = elem.f0;
                    }
                    if ( agg.f1.isEmpty()) {
                        agg.f1 = elem.f1;
                    } else {
                        agg.f1 = agg.f1 + "; " + elem.f1;
                    }
                    return agg;
                }
            });

This code does NOT compile because a ProcessingTimeTrigger needs a TimeWindow, and GlobalWindow is not a TimeWindow. So...

How can I accomplish keyed window timeouts in Flink?

Upvotes: 1

Views: 679

Answers (1)

David Anderson
David Anderson

Reputation: 43499

You will have a much easier time if you approach this with a KeyedProcessFunction.

I suggest establishing an item of keyed ListState in the open() method of a KeyedProcessFunction. In the processElement() method, if the list is empty, set a processing-time timer (a per-key timer, relative to the current time) to fire when you want the window to end. Then append the incoming event to the list.

When the timer fires the onTimer() method will be called, and you can iterate over the list, produce a result, and clear the list.

To arrange for only doing all of this only once per key, add a ValueState<Boolean> to the KeyedProcessFunction to keep track of this. (Note that if your key space is unbounded, you should think about a strategy for eventually expiring the state for stale keys.)

The documentation describes how to use Process Functions and how to work with state. You can find additional examples in the Flink training site, such as this exercise.

Upvotes: 2

Related Questions