JackGao
JackGao

Reputation: 126

Why flink does not drop late data?

I am calculating the maximum value of a simple steam and the result is:

(S1,1000,S1, value: 999)

(S1,2000,S1, value: 41)

The last line of data is obviously late: new SensorReading("S1", 999, 100L)

why was it calculated by the first window(0-1000)?

I think that the first window should be fired when SensorReading("S1", 41, 1000L) arrives.

I am very confused about this result.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(TrainingBase.parallelism);

        DataStream<SensorReading> input = env.fromElements(
                new SensorReading("S1", 35, 500L),
                new SensorReading("S1", 42, 999L),
                new SensorReading("S1", 41, 1000L),
                new SensorReading("S1", 40, 1200L),
                new SensorReading("S1", 23, 1400L),
                new SensorReading("S1", 999, 100L)
        );


        input.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<SensorReading>() {
            private long currentMaxTimestamp;

            @Nullable
            @Override
            public Watermark getCurrentWatermark() {
                return new Watermark(currentMaxTimestamp);
            }

            @Override
            public long extractTimestamp(SensorReading element, long previousElementTimestamp) {
                currentMaxTimestamp = element.ts;
                return currentMaxTimestamp;
            }
        })
                .keyBy((KeySelector<SensorReading, String>) value -> value.sensorName)
                .window(TumblingEventTimeWindows.of(Time.seconds(1)))
                .reduce(new MyReducingMax(), new MyWindowFunction())
                .print();

        env.execute();

MyReducingMax(), MyWindowFunction()

private static class MyReducingMax implements ReduceFunction<SensorReading> {
        public SensorReading reduce(SensorReading r1, SensorReading r2) {
            return r1.getValue() > r2.getValue() ? r1 : r2;
        }
    }

private static class MyWindowFunction extends
            ProcessWindowFunction<SensorReading, Tuple3<String, Long, SensorReading>, String, TimeWindow> {

        @Override
        public void process(
                String key,
                Context context,
                Iterable<SensorReading> maxReading,
                Collector<Tuple3<String, Long, SensorReading>> out) {

            SensorReading max = maxReading.iterator().next();
            out.collect(new Tuple3<>(key, context.window().getEnd(), max));
        }
    }

    public static class SensorReading {
        String sensorName;
        int value;
        Long ts;

        public SensorReading() {
        }

        public SensorReading(String sensorName, int value, Long ts) {
            this.sensorName = sensorName;
            this.value = value;
            this.ts = ts;
        }

        public Long getTs() {
            return ts;
        }

        public void setTs(Long ts) {
            this.ts = ts;
        }

        public String getSensorName() {
            return sensorName;
        }

        public void setSensorName(String sensorName) {
            this.sensorName = sensorName;
        }

        public int getValue() {
            return value;
        }

        public void setValue(int value) {
            this.value = value;
        }

        public String toString() {

            return this.sensorName + "(" + this.ts + ") value: " + this.value;
        }

        ;
    }

Upvotes: 0

Views: 273

Answers (1)

David Anderson
David Anderson

Reputation: 43524

An AssignerWithPeriodicWatermarks doesn't create a Watermark at every conceivable opportunity. Instead, Flink calls such an assigner periodically to get the latest watermark, and by default this is done every 200 msec (of real time, not event time). This interval is controlled by ExecutionConfig.setAutoWatermarkInterval(...).

This means that all six of your test events have almost certainly been processed before your watermark assigner could be called.

If you care about having more predictable watermarking, you could use an AssignerWithPunctuatedWatermarks instead.

BTW, the way that your watermark assigner is written, all of the out-of-order events are potentially late. It is more typical to use a BoundedOutOfOrdernessTimestampExtractor that allows for some out-of-orderness.

Upvotes: 1

Related Questions