Reputation: 381
List<Integer> lss = new ArrayList<>();
lss.add(2);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.fromCollection(lss)
.keyBy(x-> "1")
.timeWindow(Time.seconds(4), Time.seconds(1))
.reduce((x,y)->3)
.map(x->"vcvv")
.print();
env.execute("xxx");
I am writing a flink demo to get started on flink, I have a couple of things I don't understand:
1、if I change from TimeCharacteristic.IngestionTime to TimeCharacteristic.ProcessingTime, then flink output nothing
2、with window length of 4 seconds and a slide length of 1 second, I expect the output to have a gap of 1 second between each other. But instead, I saw 4 outputs get printed at the same time.
Upvotes: 0
Views: 922
Reputation: 43717
Flink outputs nothing when using processing time semantics because the job has finished running before the windows can close. If you supplied enough data that the job was able to run for at least 4 seconds, you would see some output.
With ingestion time semantics, you are seeing the results all at once because all of the windows are being closed at the same time, as the job ends. This occurs when the input stream has been fully ingested, and the job is shutting down -- at which point a signal is sent throughout the cluster that closes all remaining event time (and ingestion time) windows.
Upvotes: 1
Reputation: 3874
It's most probably because You are using IngestionTime
characteristics, which internally is similar to EventTime
with the difference that timestamps and watermarks are generated automatically by Flink. It basically means that whenever all elements are processed Flink will generate watermark Long.MAX_VALUE
which will flush all results and close windows.
This will not happen for ProcessingTime
, so You would need to handle this Yourself somehow. I suggest referring to the docs.
As for the second question, this is not how windows work. Depending on the selected TimeCharacteristic, windows just gather elements by the time they are received by Flink or by their timestamp (EventTime). The slide in the window just shows by how much they overlap, once again please refer to the docs
Upvotes: 0