Reputation: 868
Currently, I am using Flink to conduct research on stream processing engines. For my study, I work with historical streams, which consist of tuples of the following form:
event_time, attribute_1, ..., attribute_X
where event_time
is used as TimeCharacteristic.EventTime
during processing. Furthermore, I push my datasets into the processing topology, by either: (i) creating in-memory structures, or (ii) by reading the CSV files themselves.
Unfortunately, I have noticed that even if enough tuples have arrived in a window operator that complete a full window, that window is not pushed downstream for processing. As a result, the performance significantly drops and many times I have an OutOfMemoryError
exception (with large historical streams).
To illustrate a typical use-case, I present the following example:
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
env.setMaxParallelism(1);
List<Tuple2<Long, Integer>> l = new ArrayList<>();
l.add(new Tuple2<>(1L, 11));
l.add(new Tuple2<>(2L, 22));
l.add(new Tuple2<>(3L, 33));
l.add(new Tuple2<>(4L, 44));
l.add(new Tuple2<>(5L, 55));
DataStream<Tuple2<Long, Integer>> stream = env.fromCollection(l);
stream.assignTimestampsAndWatermarks(
new AscendingTimestampExtractor<Tuple2<Long, Integer>>() {
@Override
public long extractAscendingTimestamp(Tuple2<Long, Integer> t) {
return t.f0;
}
})
.windowAll(SlidingEventTimeWindows.of(Time.milliseconds(2),
Time.milliseconds(1)))
.sum(1)
.print();
env.execute();
According to l
's contents, I need to have the following windowed results:
Each list item can be read as [start-timestamp, end-timestamp), Sum: X.
I expect Flink to produce a windowed result every time a tuple with a timestamp beyond the end-timestamp of an open window appears. For instance, I expect the summation for window [1, 3) to be produced when the tuple with timestamp 4L
is fed into the window operator. However, the processing initiates when all the tuples from l
are pushed into the stream's topology. The same thing happens when I work with larger historical streams, which results in degraded performance (or even depleting memory).
Question: How can I force Flink to push windows downstream for processing by the time a window is complete?
I believe that for SlidingEventTimeWindows
the eviction of a window is triggered with watermarks. If the previous is true, how can I write my topologies so that they trigger windows by the time a tuple with a later timestamp arrives?
Thank you
Upvotes: 0
Views: 529
Reputation: 43707
AscendingTimestampExtractor
uses the periodic watermarking strategy, in which Flink will call the getCurrentWatermark()
method every n milliseconds, where n is the autowatermarkinterval.
The default interval is 200 milliseconds, which is very long compared to the size of your windows. However, they aren't directly comparable -- the 200 msec is measured in processing time, not event time. Nevertheless, I suspect that if you haven't changed this configuration setting, then a lot of windows are created before the first watermark is emitted, which I think explains what you are seeing.
You could reduce the auto-watermarking interval (perhaps to 1 millisecond). Or you could implement an AssignerWithPunctuatedWatermarks, which will give you more control.
Upvotes: 1