Alter
Alter

Reputation: 1213

Deactivate onTimer on KeyedProcessFunction in Flink

Let's take this as example:

I have been called onTimer function in a KeyedProcessFunction based in this concept:

(when a == "start" -> ctx.timerService().registerProcessingTimeTimer(some time in long)),

but then a new record arrive with this concept which means end of the ride:

(when a == "end" -> ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime();))

This second timer triggers the action immediately, and the idea is to clean up the timer that was set before because I has no sense that the previous timer stays alive.

The point is that if the action two doesn't happen within one hour for example I need to do something (ctx.timerService().registerProcessingTimeTimer(some time in long)), but if the value expected arrives within that hour then is no need to trigger the timer or trigger the timer instantly and forget the other timer programmed before (ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime();)), but I'm having issues because the timer is triggered no matter what happen, the previous timer is been triggered too.

I'd try to use ctx.timerService().deleteProcessingTimeTimer(some time in long); but it seems not to work.

See the example:

Events order: event A will always arrive before than B. explanation: Event B must arrive in a range of one hour after event A arrives, otherwise the timer will be trigger one hour after A arrives, but if B arrives after the timer been set for one hour, the timer should been triggered instantly and the previous timer defined must be never called (deleted).

 SingleOutputStreamOperator<Events> abandonment = stream.keyBy(e -> e.id)
.process(new KeyedProcessFunctionName());


public class KeyedProcessFunctionNameextends KeyedProcessFunction<String, Event, Event> {

@Override
    public void processElement(Event e, Context ctx, Collector<Event> out) throws Exception {
        if (e.value.equalsIgnoreCase("B")) {
{
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime());
        }

        if (stateTwo.value() == null && e.value.equalsIgnoreCase("A")) {
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + SOME_FIXED_TIME_IN_LONG);
        }
}

}


@Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Event> out) throws Exception {

/*when this timer is been called because of B it must not been called because of a previous timer set because of A*/
}
}

Any idea?

Upvotes: 1

Views: 1304

Answers (1)

David Anderson
David Anderson

Reputation: 43524

Timers are always (implicitly) bound to a key. When you create a timer it is associated with the key of the event being processed (or the key of the timer currently firing). Similarly, you can only delete timers associated with the key currently in context. If timer deletion doesn't seem to be working, perhaps this is why.

Another fact to keep in mind is that timers are deduplicated. In other words, for any given (key, timestamp) pair, there can be at most one event-time timer, and one processing-time timer. Subsequent attempts to register another timer for the same key and timestamp will be ignored.

Sometimes it's helpful to use keyed state to remember what should be done when a timer (for the same key) fires. If you have many timers per key, you could use MapState indexed by the timestamp to keep some state for each timer.

Upvotes: 3

Related Questions