Ignatius J. Reilly
Ignatius J. Reilly

Reputation: 89

apache-flink: sliding window in output

I'm currently coding a small application to understand the sliding windowing in FLINK (with data input from a APACHE-KAFKA topic):

//Split kafka stream by comma and create tuple 
DataStream<Tuple3<String, Integer, Date>> parsedStream = stream
    .map((line) -> {
       String[] cells = line.split(",");
       return new Tuple3(cells[1], Integer.parseInt(cells[4]), f.parse(cells[2]));
    });

DataStream<Tuple3<String, Integer, Date>> parsedStreamWithTSWM = parsedStream
        .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Integer, Date>>(Time.minutes(1)) {

            @Override
            public long extractTimestamp(Tuple3<String, Integer, Date> element) {
                return element.f2.getTime();
            }
        });

//Sum values per windows and per id
DataStream<Tuple3<String, Integer, Date>>  AggStream = parsedStreamWithTSWM
    .keyBy(0)
    .window(SlidingEventTimeWindows.of(Time.minutes(30), Time.minutes(1)))
    .sum(1);

AggStream.print();

Is it possible to improve my output (AggStream.print();) by adding the window details which produce the aggregation output ?

$ tail -f flink-chapichapo-jobmanager-0.out
(228035740000002,300,Fri Apr 07 14:42:00 CEST 2017)
(228035740000000,28,Fri Apr 07 14:42:00 CEST 2017)
(228035740000002,300,Fri Apr 07 14:43:00 CEST 2017)
(228035740000000,27,Fri Apr 07 14:43:00 CEST 2017)
(228035740000002,300,Fri Apr 07 14:44:00 CEST 2017)
(228035740000000,26,Fri Apr 07 14:44:00 CEST 2017)
(228035740000001,27,Fri Apr 07 14:44:00 CEST 2017)
(228035740000002,300,Fri Apr 07 14:45:00 CEST 2017)
(228035740000000,25,Fri Apr 07 14:45:00 CEST 2017)

Thank you in advance

Upvotes: 1

Views: 943

Answers (1)

Dawid Wysakowicz
Dawid Wysakowicz

Reputation: 3422

You can use the generic function apply where you have access to Window info.

public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {

    /**
     * Evaluates the window and outputs none or several elements.
     *
     * @param key The key for which this window is evaluated.
     * @param window The window that is being evaluated.
     * @param input The elements in the window being evaluated.
     * @param out A collector for emitting elements.
     *
     * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
     */
    void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}

See docs

Upvotes: 1

Related Questions