ChrisATX
ChrisATX

Reputation: 129

Apache Flink: Emit output records in Flink based on keyed state even if no input records have arrived for a given aggregation window

I am trying to use Apache Flink for an IoT application. I have a bunch of devices that can be in one of several states. When a device changes state, it emits a message that includes an event timestamp and the state it changed to. For one device, this might look like this:

{Device_id: 1, Event_Timestamp: 9:01, State: STATE_1}

{Device_id: 1, Event_Timestamp: 9:03, State: STATE_2}

For each device, I need to produce a five minute aggregate for the amount of time the device spent in each state for the given five minute window. In order to do this, I plan to use the keyed state to store the last state update for each device, so that I know what state the device was in for the beginning of the aggregation window. For example, assume the device with id "1" has a keyed state value that said it entered "STATE_2" at 8:58. Then the output of the aggregation for the 9:00 - 9:05 window would like like this (based on the two example events above):

{Device_id: 1, Timestamp: 9:00, State: STATE_1, Duration: 120 seconds}

{Device_id: 1, Timestamp: 9:00, State: STATE_2, Duration: 180 seconds}

My problem is this: Flink will only open a window for a given device_id if there is an event for the window. This means that if a device does not change state for over 5 minutes, no record will enter the stream, so the window will not open. However, I need to emit a record that says that the device spent the entire five minutes in whatever the current state is based on what is stored in the keyed state. For example, Flink should emit a record for 9:05-9:10 that says the device with id "1" spent all 300 seconds in "STATE_2".

Is there a way to output records for the amount of time each device spent in a given state for a five minute aggregation window EVEN IF the state does not change within those five minutes, and, thus, the device sends no events? If not, are there any workarounds I can use to get the output events I need for my application?

Upvotes: 1

Views: 738

Answers (1)

David Anderson
David Anderson

Reputation: 43409

A straightforward way to implement this would be to use a ProcessFunction rather than windowing. You can keep whatever keyed state is convenient for your application, and use timers to trigger producing the periodic reports.

Upvotes: 5

Related Questions