Reputation: 3
I am recently studying ProcessWindowFunction
in Flink's new release. It says the ProcessWindowFunction
supports global state and window state. I use Scala API to give it a try. I can so far get the global state working but I do no have any luck to make it for the window state. What I'm doing is to process system logs and count the number of logs keyed by hostname and severity level. I would like to calculate the difference in log count between two adjacent windows. Here is my code implementing ProcessWindowFunction
.
class LogProcWindowFunction extends ProcessWindowFunction[LogEvent, LogEvent, Tuple, TimeWindow] {
// Create a descriptor for ValueState
private final val valueStateWindowDesc = new ValueStateDescriptor[Long](
"windowCounters",
createTypeInformation[Long])
private final val reducingStateGlobalDesc = new ReducingStateDescriptor[Long](
"globalCounters",
new SumReduceFunction(),
createTypeInformation[Long])
override def process(key: Tuple, context: Context, elements: Iterable[LogEvent], out: Collector[LogEvent]): Unit = {
// Initialize the per-key and per-window ValueState
val valueWindowState = context.windowState.getState(valueStateWindowDesc)
val reducingGlobalState = context.globalState.getReducingState(reducingStateGlobalDesc)
val latestWindowCount = valueWindowState.value()
println(s"lastWindowCount: $latestWindowCount ......")
val latestGlobalCount = if (reducingGlobalState.get() == null) 0L else reducingGlobalState.get()
// Compute the necessary statistics and determine if we should launch an alarm
val eventCount = elements.size
// Update the related state
valueWindowState.update(eventCount.toLong)
reducingGlobalState.add(eventCount.toLong)
for (elem <- elements) {
out.collect(elem)
}
}
}
I always get 0
value from the window state instead of the previous updated count it should be. I've been struggling with such problem for several days. Can someone please help me to figure it out? Thanks.
Upvotes: 0
Views: 2370
Reputation: 43707
The scope of the per-window state is a single window instance. In the case of your process
method above, every time it is called a new window is in scope, and so the latestWindowCount is always zero.
For a normal, vanilla window that is only going to fire once, per-window state is useless. Only if a window somehow has multiple firings (e.g., late firings) can you make good use of the per-window state. If you are trying to remember something from one window to the next, then you can do this with the global window state.
For an example of using per-window state to remember data to use in late firings, see slides 13-19 in Flink's advanced window training.
Upvotes: 1