Reputation: 43
I'm trying to understand watermarks with Event Time.
My Code is similar with Flink Documentation WordCount example .
I did some changes to include timestamp on event and added watermarks.
Event format is: word;timestamp
The map function creates a tuple3 with word;1;timestamp.
Then it's assign a watermark strategy with timestamp assigner equals to event timestamp field.
For the following stream events:
I got the following result: (test,6) => This is correct, i sent 6 times test word.
But looking for context in ProcessFunction i see the following:
Processing Time: Fri Sep 02 15:27:20 WEST 2022
Watermark: Fri Sep 02 15:26:56 WEST 2022
Start Window: 2022 09 02 15:26:40 End Window: 2022 09 02 15:27:20
The window it's correct, it's 40 seconds window as defined, and the watermark also it's correct, it's 20 seconds less the last event timestamp (1662128836581 = Friday, September 2, 2022 3:27:16) as defined in watermark strategy.
My Question is the window processing time. The window fired exactly at end window time processing time, but shouldn't wait until watermark pass the end of window (something like processing time = end of window + 20 seconds) (Window Default Trigger Docs) ?
What i'm doing wrong? or i'm having a bad understanding about watermarks?
My Code:
public class DataStreamJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
WatermarkStrategy<Tuple3<String, Integer, Long>> strategy = WatermarkStrategy
.<Tuple3<String, Integer, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((event, timestamp) -> event.f2);
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9999)
.map(new Splitter())
.keyBy(value -> value.f0)
.process(new MyProcessWindowFunction());
env.execute("Window WordCount");
public static class Splitter extends RichMapFunction<String, Tuple3<String, Integer, Long>> {
public Tuple3<String, Integer, Long> map(String value) throws Exception {
String[] word = value.split(";");
return new Tuple3<String, Integer, Long>(word[0], 1, Long.parseLong(word[1]));
public static class MyProcessWindowFunction extends ProcessWindowFunction<Tuple3<String, Integer, Long>, Tuple2<String, Integer>, String, TimeWindow> {
public void process(String s, ProcessWindowFunction<Tuple3<String, Integer, Long>, Tuple2<String, Integer>, String, TimeWindow>.Context context, Iterable<Tuple3<String, Integer, Long>> elements, Collector<Tuple2<String, Integer>> out) throws Exception {
Integer sum = 0;
for (Tuple3<String, Integer, Long> in : elements) {
out.collect(new Tuple2<String, Integer>(s, sum));
Date date = new Date(context.window().getStart());
Date date2 = new Date(context.window().getEnd());
Date watermark = new Date(context.currentWatermark());
Date processingTime = new Date(context.currentProcessingTime());
System.out.println("Processing Time: " + processingTime);
Format format = new SimpleDateFormat("yyyy MM dd HH:mm:ss");
System.out.println("Watermark: " + watermark);
System.out.println("Start Window: " + format.format(date) + " End Window: " + format.format(date2));
Upvotes: 1
Views: 945
Reputation: 43439
To get event time windows, you need to change
Upvotes: 3