Reputation: 603
I'm using a custom trigger to Fire when there are 5 events in the window. This works well with a TumblingEventTimeWindow and SlidingTimeWindow since they both have a fixed window start and ends. But the logic doesn't work with Session Windows as each event results in a window being created and later merged. I'm using a reducing state to count the events.
How to handle such a situation in case of a Session Window where events have to be counted?
Code Gist: https://gist.github.com/thepythonista/4ad2f8c41f56aaea6ebf13fd9392c4bc
Additional Problem: I was able to use a ReducingStateDescriptor in the OnMergeContext's mergePartitionState method. The events were getting counted correctly. But when I tried to use the mergePartitionState method for a ValueStateDescriptor, a compile-time error is thrown
ReducingStateDescriptor<Long> eventCounterDescriptor = new ReducingStateDescriptor<>("COUNT", new Sum(), LongSerializer.INSTANCE);
ValueStateDescriptor<Boolean> exitEventDescriptor = new ValueStateDescriptor<>("EXIT_EVENT", Boolean.class);
@Override
public void onMerge(TimeWindow window, OnMergeContext ctx) {
ctx.mergePartitionedState(reducingStateDescriptor); // can do this
ctx.mergePartitionedState(valueStateDescriptor); // won't compile
ctx.registerEventTimeTimer(window.maxTimestamp());
}
Error:(133, 16) java: method mergePartitionedState in interface org.apache.flink.streaming.api.windowing.triggers.Trigger.OnMergeContext cannot be applied to given types;
required: org.apache.flink.api.common.state.StateDescriptor<S,?>
found: org.apache.flink.api.common.state.ValueStateDescriptor<java.lang.Boolean>
reason: inference variable S has incompatible bounds
equality constraints: org.apache.flink.api.common.state.ValueState<java.lang.Boolean>
upper bounds: org.apache.flink.api.common.state.MergingState<?,?>
Upvotes: 1
Views: 1163
Reputation: 603
All I had to do was call the OnMergeContext's mergePartitionedState function. Updated the gist.
@Override
public void onMerge(TimeWindow window, OnMergeContext ctx) {
ctx.mergePartitionedState(reducingStateDescriptor);
ctx.registerEventTimeTimer(window.maxTimestamp());
}
Upvotes: 1