Reputation: 533
I have a Flink streaming job, where I am creating windows based on some keys and adding the data points to a data point.
.window(SlidingProcessingTimeWindows.of(Time.days(1), Time.minutes(5)))
.trigger(CountTrigger.of(5))
.window(<ProcessWindowFunction>)
I'm using the above piece of code for creating sliding window of size 1 day with a slide of 5 minutes. Als, count trigger is triggering the process function once 5 data points are accumulated.
In addition to this, I want to trigger the process function for every slide
that happens. This means, till 1 day of data points are accumulated (window size
), CountTrigger
shall trigger the process
function and once 1 day worth points are created and window slides for every 5 minutes, I want to trigger the process function for every data point instead of waiting for CountTrigger
to accumulate 10 data points. Can someone please help me on how to do this?
Upvotes: 0
Views: 1054
Reputation: 43499
Be aware that this is going to be pretty painful. Every event is going to be assigned to a total of 288 windows (24 hours / 5 minutes). This means that every event is going to trigger 288 calls to your ProcessWindowFunction.
If you find you need to optimize this, you can probably get better performance with a carefully implemented KeyedProcessFunction.
Upvotes: 1
Reputation: 5093
Extend org.apache.flink.streaming.api.windowing.triggers.CountTrigger
and override onProcessingTime
method. Implement your processing time logic there. Then use this trigger instead of plain CountTrigger
.
Upvotes: 1