AtharvaI
AtharvaI

Reputation: 1160

Flink timeWindow get start time

I'm calculating a count (summing 1) over a timewindow as follows:

mappedUserTrackingEvent
            .keyBy("videoId", "userId")
            .timeWindow(Time.seconds(30))
            .sum("count")

I would like to actually add the window start time as a key field too. so the result would be something like:

key: videoId=123,userId=234,time=2016-09-16T17:01:30
value: 50

So essentially aggregate count by window. End Goal is to draw a histogram of these windows.

How can I add the start of window as a field in the key? and following that align the window to 00s or 30s in this case? Is that possible?

Upvotes: 1

Views: 3927

Answers (2)

yeyunlong
yeyunlong

Reputation: 23

You can use the method aggregate instead of sum.
In aggregate set the secondly parameter implements WindowFunction or extends ProcessWindowFunction.
I am using the Flink-1.4.0 , recommend to use ProcessWindowFunction, like:

mappedUserTrackingEvent
    .keyBy("videoId", "userId")
    .timeWindow(Time.seconds(30))
    .aggregate(new Count(), new MyProcessWindowFunction();

public static class MyProcessWindowFunction extends ProcessWindowFunction<Integer, Tuple2<Long,  Integer>, Tuple, TimeWindow>
{
    @Override
    public void process(Tuple tuple, Context context, Iterable<Integer> iterable, Collector<Tuple2<Long,  Integer>> collector) throws Exception
    {
        context.currentProcessingTime();
        context.window().getStart();
    }
}

Upvotes: 2

Fabian Hueske
Fabian Hueske

Reputation: 18987

The apply() method of the WindowFunction provides a Window object, which is a TimeWindow if you use keyBy().timeWindow(). The TimeWindow object has two methods, getStart() and getEnd() which return the timestamp of the window's start and end, respectively.

At the moment it is not possible use the sum() aggregation together with a WindowFunction. You need to do something like:

 mappedUserTrackingEvent
        .keyBy("videoId", "userId")
        .timeWindow(Time.seconds(30))
        .apply(new MySumReduceFunction(), new MyWindowFunction());`

MySumReduceFunction implements the ReduceFunction interface and compute the sum by incrementally aggregating the elements that arrive in the window. The MyWindowFunction implements WindowFunction. It receives the aggregated value through the Iterable parameter and enriches the value with the timestamp obtained from the TimeWindow parameter.

Upvotes: 4

Related Questions