AmerS
AmerS

Reputation: 118

Apache Flink: emit output records periodically even if no input records have arrives for a given window

I keep only the last event saved in a state, and want to emit that state periodically.

Same issue is explained here, but the accepted answer only give a hint to use timers. Which did not work for me.

Also found this, but it was a different issue; the input stream there would have so many events in every window. my main problem is that my input stream don't have events for every and each window, yet I want to produce output for every and each window.

First Failed solution:

// pseudo code:
keyedStream.window(1 Minute).reduce() // does not emit when no events arrive

Another Failed Solution:

//Tried with a process function and a timer,
// but timer could only be created in processElement and fires only once.
// it is not periodic

class MyProcessFunction extends KeyedProcessFunction<Integer, Input, Output>{

    public void processElement(Input event, Context context, Collector<Output> out) throws Exception {

        savedState.update(event) // save last event in a state

        // This creates a timer on every event, which is OK
        // and I can easily change the code so that the timer get created only on first event
        // But there was no way to make the timer periodic, independent of arrival of events
        context.timerService().registerEventTimeTimer(someWindow);            
    }

    public void onTimer(long timestamp, OnTimerContext context, Collector<Output> out) throws Exception {
        // this emit exactly once
        out.collect(savedState.value());

        // was hoping for something like 
        // context.timerService().registerEventTimeTimer(timestamp + someWindow);

       // but there is no access to timerService() here, so I couldn't make the timer periodic
    } 
}

Any suggestions?

Upvotes: 1

Views: 753

Answers (1)

David Anderson
David Anderson

Reputation: 43409

You can register timers in the onTimer method; the OnTimerContext does have a TimerService. What you were hoping for is how it is normally used.

Perhaps if you provide more details we can sort out why it didn't work for you.

Upvotes: 2

Related Questions