Reputation: 1160
I'm calculating a count (summing 1) over a timewindow as follows:
mappedUserTrackingEvent
.keyBy("videoId", "userId")
.timeWindow(Time.seconds(30))
.sum("count")
I would like to actually add the window start time as a key field too. so the result would be something like:
key: videoId=123,userId=234,time=2016-09-16T17:01:30
value: 50
So essentially aggregate count by window. End Goal is to draw a histogram of these windows.
How can I add the start of window as a field in the key? and following that align the window to 00s or 30s in this case? Is that possible?
Upvotes: 1
Views: 3927
Reputation: 23
You can use the method aggregate
instead of sum.
In aggregate
set the secondly parameter implements WindowFunction
or extends ProcessWindowFunction
.
I am using the Flink-1.4.0 , recommend to use ProcessWindowFunction
, like:
mappedUserTrackingEvent
.keyBy("videoId", "userId")
.timeWindow(Time.seconds(30))
.aggregate(new Count(), new MyProcessWindowFunction();
public static class MyProcessWindowFunction extends ProcessWindowFunction<Integer, Tuple2<Long, Integer>, Tuple, TimeWindow>
{
@Override
public void process(Tuple tuple, Context context, Iterable<Integer> iterable, Collector<Tuple2<Long, Integer>> collector) throws Exception
{
context.currentProcessingTime();
context.window().getStart();
}
}
Upvotes: 2
Reputation: 18987
The apply()
method of the WindowFunction
provides a Window
object, which is a TimeWindow
if you use keyBy().timeWindow()
. The TimeWindow
object has two methods, getStart()
and getEnd()
which return the timestamp of the window's start and end, respectively.
At the moment it is not possible use the sum()
aggregation together with a WindowFunction
. You need to do something like:
mappedUserTrackingEvent
.keyBy("videoId", "userId")
.timeWindow(Time.seconds(30))
.apply(new MySumReduceFunction(), new MyWindowFunction());`
MySumReduceFunction
implements the ReduceFunction
interface and compute the sum by incrementally aggregating the elements that arrive in the window. The MyWindowFunction
implements WindowFunction
. It receives the aggregated value through the Iterable
parameter and enriches the value with the timestamp obtained from the TimeWindow
parameter.
Upvotes: 4