Reputation: 305
This first question;
I want to learn time behaviour of window. Let's assume I'll process data every 2 seconds with the Processing time
, and the current time is 10:26:25.169
. At this time, I deployed job.
In this case, Will each time window be rounded to 0, 2, 4 and so on seconds? Like below;
10:26:24.000 - 10:26:26.000
10:26:26.000 - 10:26:28.000
As you can see, i've deploy job at 10:26:25.169
, but flink did round window by 2 seconds. Is that right?
If not, Does windows works like below?;
10:26:25.169 - 10:26:27.169
10:26:27.169 - 10:26:29.169
Which one is true? Is this behaviour can change when I use event time
instead of processing time
?
The second question;
I want to keep state for each key. For that i can use richFlatMap function or keyedProcessFunction. But i wonder that can I manage state using above functions after applied window? For example;
// in this case i can manage state by key
ds.keyBy(_.field).process(new MyStateFunction)
// in this case, can i manage state after window for the same key?
ds.keyBy(keyExtractorWithTime)
.window(new MyWindowFunction)
.reduce(new myRedisFunction)
.process(new MyStateFunction)
Upvotes: 0
Views: 1211
Reputation: 3
Question 1: If offset
parameter not be assigned, flink will use integer multiples of window size as the startTime
by default (endTime
= startTime
+ windowSize
). So the blow you asked is right.
10:26:24.000 - 10:26:26.000
10:26:26.000 - 10:26:28.000
In flink, the startTime
will be calculated by this way:
/**
* Method to get the window start for a timestamp.
*
* @param timestamp epoch millisecond to get the window start.
* @param offset The offset which window start would be shifted by.
* @param windowSize The size of the generated windows. windowSize.
* @return window start
*/
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
return timestamp - (timestamp - offset + windowSize) % windowSize;
}
Question 2: If you want manage state after Keyed Window, the below way maybe effective. This way allow you manage the state and reduce
function results for each window.
DataStream<SensorReading> input = ...;
input
.keyBy(<key selector>)
.timeWindow(<duration>)
.reduce(new MyReduceFunction(), new MyProcessWindowFunction());
// Function definitions
private static class MyReduceFunction implements ReduceFunction<SensorReading> {
public SensorReading reduce(SensorReading r1, SensorReading r2) {
return r1.value() > r2.value() ? r2 : r1;
}
}
private static class MyProcessWindowFunction
extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {
public void process(String key,
Context context,
Iterable<SensorReading> minReadings,
Collector<Tuple2<Long, SensorReading>> out) {
SensorReading min = minReadings.iterator().next();
out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));
}
}
For more detail here.
Upvotes: 0
Reputation: 3874
As for the first question, it will always be full 2 second interval rounded, so basically as You've described:
10:26:24.000 - 10:26:26.000
10:26:26.000 - 10:26:28.000
There is an offset
argument that allows You to control that behaviour to some extent. But basically while the Flink actually creates the window when the data arrives, the startTime
and endTime
do not depend on when the data arrives, so the data is fit into the window not the other way around.
More info can be found here
As, for the second question the ProcessWindowFunction
is keyed function and thus You will be able to use keyed state inside the function just as You would be able to do it in standard ProcessFunction
.
Upvotes: 1