Avinash
Avinash

Reputation: 603

Flink Session Window: Count events and trigger on specific event counts

I'm using a custom trigger to Fire when there are 5 events in the window. This works well with a TumblingEventTimeWindow and SlidingTimeWindow since they both have a fixed window start and ends. But the logic doesn't work with Session Windows as each event results in a window being created and later merged. I'm using a reducing state to count the events.

How to handle such a situation in case of a Session Window where events have to be counted?

Code Gist: https://gist.github.com/thepythonista/4ad2f8c41f56aaea6ebf13fd9392c4bc

Additional Problem: I was able to use a ReducingStateDescriptor in the OnMergeContext's mergePartitionState method. The events were getting counted correctly. But when I tried to use the mergePartitionState method for a ValueStateDescriptor, a compile-time error is thrown

ReducingStateDescriptor<Long> eventCounterDescriptor = new ReducingStateDescriptor<>("COUNT", new Sum(), LongSerializer.INSTANCE);
ValueStateDescriptor<Boolean> exitEventDescriptor = new ValueStateDescriptor<>("EXIT_EVENT", Boolean.class);

@Override
public void onMerge(TimeWindow window, OnMergeContext ctx) {
    ctx.mergePartitionedState(reducingStateDescriptor); // can do this
    ctx.mergePartitionedState(valueStateDescriptor); // won't compile
    ctx.registerEventTimeTimer(window.maxTimestamp());
}


Error:(133, 16) java: method mergePartitionedState in interface org.apache.flink.streaming.api.windowing.triggers.Trigger.OnMergeContext cannot be applied to given types;
  required: org.apache.flink.api.common.state.StateDescriptor<S,?>
  found: org.apache.flink.api.common.state.ValueStateDescriptor<java.lang.Boolean>
  reason: inference variable S has incompatible bounds
    equality constraints: org.apache.flink.api.common.state.ValueState<java.lang.Boolean>
    upper bounds: org.apache.flink.api.common.state.MergingState<?,?>

Upvotes: 1

Views: 1163

Answers (1)

Avinash
Avinash

Reputation: 603

All I had to do was call the OnMergeContext's mergePartitionedState function. Updated the gist.

@Override
public void onMerge(TimeWindow window, OnMergeContext ctx) {
    ctx.mergePartitionedState(reducingStateDescriptor);
    ctx.registerEventTimeTimer(window.maxTimestamp());
}

Upvotes: 1

Related Questions