asm
asm

Reputation: 947

Flink Trigger when State expires

I have an interesting use case which I want to test with Flink. I have an incoming stream of Message which is either PASS or FAIL. Now if the message is of type FAIL, I have a downstream ProcessFunction which saves the Message state and then sends pause commands to everything that depends on this. When I receive a PASS message which is associated with the FAIL I had received earlier (keying by message id), I send resume commands to everything I had paused earlier.

Now I plan on using State TTL to expire the stored FAIL state and resume everything after a certain timeout even if I haven't received a PASS message with the same message id. Could this be done with Flink alone or would I need to have some external timer to send timeout messages to my program?

I had something like this in mind to get it working in Flink:

For each Message, add timestamp and pass it on to a process function which waits until current_ts - timestamp == timeout before sending it on to resume everything paused by the module. Is there a better way or do you guys think this is ok?

Upvotes: 0

Views: 942

Answers (1)

David Anderson
David Anderson

Reputation: 43697

Seems like it would be more straightforward to use a timer to expire the state (by calling state.clear() in the onTimer method), rather than using state TTL. The same onTimer method can also arrange for things to resume at the same time.

Upvotes: 2

Related Questions