Reputation: 118
I keep only the last event saved in a state, and want to emit that state periodically.
Same issue is explained here, but the accepted answer only give a hint to use timers. Which did not work for me.
Also found this, but it was a different issue; the input stream there would have so many events in every window. my main problem is that my input stream don't have events for every and each window, yet I want to produce output for every and each window.
First Failed solution:
// pseudo code:
keyedStream.window(1 Minute).reduce() // does not emit when no events arrive
Another Failed Solution:
//Tried with a process function and a timer,
// but timer could only be created in processElement and fires only once.
// it is not periodic
class MyProcessFunction extends KeyedProcessFunction<Integer, Input, Output>{
public void processElement(Input event, Context context, Collector<Output> out) throws Exception {
savedState.update(event) // save last event in a state
// This creates a timer on every event, which is OK
// and I can easily change the code so that the timer get created only on first event
// But there was no way to make the timer periodic, independent of arrival of events
context.timerService().registerEventTimeTimer(someWindow);
}
public void onTimer(long timestamp, OnTimerContext context, Collector<Output> out) throws Exception {
// this emit exactly once
out.collect(savedState.value());
// was hoping for something like
// context.timerService().registerEventTimeTimer(timestamp + someWindow);
// but there is no access to timerService() here, so I couldn't make the timer periodic
}
}
Any suggestions?
Upvotes: 1
Views: 753
Reputation: 43409
You can register timers in the onTimer
method; the OnTimerContext
does have a TimerService
. What you were hoping for is how it is normally used.
Perhaps if you provide more details we can sort out why it didn't work for you.
Upvotes: 2