yaarix
yaarix

Reputation: 500

Flink window state size and state management

After reading flink's documentation and searching around, i couldn't entirely understand how flink's handles state in its windows. Lets say i have an hourly tumbling window with an aggregation function that accumulate msgs into some java pojo or scala case class. Will The size of that window be tied to the number of events entering that window in a single hour, or will it just be tied to the pojo/case class, as im accumalting the events into that object. (e.g if counting 10000 msgs into an integer, will the size be close to 10000 * msg size or size of an int?) Also, if im using pojos or case classes, does flink handle the state for me (spills to disk if memory exhausted/saves state at check points etc) or must i use flink's state objects for that?

Thanks for your help!

Upvotes: 7

Views: 2487

Answers (1)

Fabian Hueske
Fabian Hueske

Reputation: 18987

The state size of a window depends on the type of function that you apply. If you apply a ReduceFunction or AggregateFunction, arriving data is immediately aggregated and the window only holds the aggregated value. If you apply a ProcessWindowFunction or WindowFunction, Flink collects all input records and applies the function when time (event or processing time depending on the window type) passes the window's end time.

You can also combine both types of functions, i.e., have an AggregateFunction followed by a ProcessWindowFunction. In that case, arriving records are immediately aggregated and when the window is closed, the aggregation result is passed as single value to the ProcessWindowFunction. This is useful because you have incremental aggregation (due to ReduceFunction / AggregateFunction) but also access to the window metadata like begin and end timestamp (due to ProcessWindowFunction).

How the state is managed depends on the chosen state backend. If you configure the FsStateBackend all local state is kept on the heap of the TaskManager and the JVM process is killed with an OutOfMemoryError if the state grows too large. If you configure the RocksDBStateBackend state is spilled to disk. This comes with de/serialization costs for every state access but gives much more storage for state.

Upvotes: 13

Related Questions