abracadabra
abracadabra

Reputation: 381

flink doesn't output anything

List<Integer> lss = new ArrayList<>();
        lss.add(2);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        env.fromCollection(lss)
                .keyBy(x-> "1")
                .timeWindow(Time.seconds(4), Time.seconds(1))
                .reduce((x,y)->3)
                .map(x->"vcvv")
                .print();
        env.execute("xxx");

I am writing a flink demo to get started on flink, I have a couple of things I don't understand:
1、if I change from TimeCharacteristic.IngestionTime to TimeCharacteristic.ProcessingTime, then flink output nothing
2、with window length of 4 seconds and a slide length of 1 second, I expect the output to have a gap of 1 second between each other. But instead, I saw 4 outputs get printed at the same time.

Upvotes: 0

Views: 922

Answers (2)

David Anderson
David Anderson

Reputation: 43717

  1. Flink outputs nothing when using processing time semantics because the job has finished running before the windows can close. If you supplied enough data that the job was able to run for at least 4 seconds, you would see some output.

  2. With ingestion time semantics, you are seeing the results all at once because all of the windows are being closed at the same time, as the job ends. This occurs when the input stream has been fully ingested, and the job is shutting down -- at which point a signal is sent throughout the cluster that closes all remaining event time (and ingestion time) windows.

Upvotes: 1

Dominik Wosiński
Dominik Wosiński

Reputation: 3874

It's most probably because You are using IngestionTime characteristics, which internally is similar to EventTime with the difference that timestamps and watermarks are generated automatically by Flink. It basically means that whenever all elements are processed Flink will generate watermark Long.MAX_VALUE which will flush all results and close windows.

This will not happen for ProcessingTime, so You would need to handle this Yourself somehow. I suggest referring to the docs.

As for the second question, this is not how windows work. Depending on the selected TimeCharacteristic, windows just gather elements by the time they are received by Flink or by their timestamp (EventTime). The slide in the window just shows by how much they overlap, once again please refer to the docs

Upvotes: 0

Related Questions