Reputation: 89
I'm currently coding a small application to understand the sliding windowing in FLINK (with data input from a APACHE-KAFKA topic):
//Split kafka stream by comma and create tuple
DataStream<Tuple3<String, Integer, Date>> parsedStream = stream
.map((line) -> {
String[] cells = line.split(",");
return new Tuple3(cells[1], Integer.parseInt(cells[4]), f.parse(cells[2]));
});
DataStream<Tuple3<String, Integer, Date>> parsedStreamWithTSWM = parsedStream
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Integer, Date>>(Time.minutes(1)) {
@Override
public long extractTimestamp(Tuple3<String, Integer, Date> element) {
return element.f2.getTime();
}
});
//Sum values per windows and per id
DataStream<Tuple3<String, Integer, Date>> AggStream = parsedStreamWithTSWM
.keyBy(0)
.window(SlidingEventTimeWindows.of(Time.minutes(30), Time.minutes(1)))
.sum(1);
AggStream.print();
Is it possible to improve my output (AggStream.print();) by adding the window details which produce the aggregation output ?
$ tail -f flink-chapichapo-jobmanager-0.out
(228035740000002,300,Fri Apr 07 14:42:00 CEST 2017)
(228035740000000,28,Fri Apr 07 14:42:00 CEST 2017)
(228035740000002,300,Fri Apr 07 14:43:00 CEST 2017)
(228035740000000,27,Fri Apr 07 14:43:00 CEST 2017)
(228035740000002,300,Fri Apr 07 14:44:00 CEST 2017)
(228035740000000,26,Fri Apr 07 14:44:00 CEST 2017)
(228035740000001,27,Fri Apr 07 14:44:00 CEST 2017)
(228035740000002,300,Fri Apr 07 14:45:00 CEST 2017)
(228035740000000,25,Fri Apr 07 14:45:00 CEST 2017)
Thank you in advance
Upvotes: 1
Views: 943
Reputation: 3422
You can use the generic function apply
where you have access to Window
info.
public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
/**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param window The window that is being evaluated.
* @param input The elements in the window being evaluated.
* @param out A collector for emitting elements.
*
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}
See docs
Upvotes: 1