Yue  Liu
Yue Liu

Reputation: 141

How to actively delete a state in Flink after a fixed amount of time?

In a Flink job, I want to delete state in memory 24 hours after it is constructed. I checked this post and set up state time to live(ttl), but as it mentioned in this article, state removal is lazy/passive, which may lead to memory leak.

For example, after 23 hours 57 minutes I got the last message for key ('USA', 'Male', 2018), and after that no more message comes for this key. Then I won't be able to invoke the function and the ttl of the state for this key ('USA', 'Male', 2018), then it will be kept in memory forever.

This article mentioned using a timer: The idea is to register a timer with the TTL per state value and access. When the timer elapses, the state can be cleared if no other state access happened since the timer was registered. but I am not able to figure out how to do that.

I'm thinking about using ProcessFunction which has a onTimer() method. My plan is to register a ProcessingTimeTimer in its open() method, and delete the state in onTimer(), but I don't know if this timer is also passively triggered, meaning it won't fire if there no invocation of the ProcessFunction even after 24 hours.

Upvotes: 1

Views: 2084

Answers (1)

David Anderson
David Anderson

Reputation: 43697

Using a ProcessFunction for this is a fine idea. The ProcessFunction will necessarily hold the keyed state in question, and will be aware of all reads and writes to the state, which you can use to create and delete timers in whatever way makes sense for your application.

Timers are keyed (in the same way that the state is keyed), and a processing time timer will fire on schedule regardless of stream activity or inactivity for that key (or other keys, for that matter). If the job is down for some reason at the scheduled time, processing time timers that should have fired during the outage will fire when the job recovers.

Upvotes: 2

Related Questions