Eric Zheng
Eric Zheng

Reputation: 1144

Flink side output for late data missing

This is my application code

object StreamingJob {
    def main(args: Array[String]) {
        // set up the streaming execution environment
        val env = StreamExecutionEnvironment.getExecutionEnvironment

        // define EventTime and Watermark
        var sensorData: DataStream[SensorReading] = env.addSource(new SensorSource).assignTimestampsAndWatermarks(
            WatermarkStrategy
                .forBoundedOutOfOrderness[SensorReading](Duration.ofSeconds(0))
                .withTimestampAssigner(new SerializableTimestampAssigner[SensorReading] {
                    override def extractTimestamp(t: SensorReading, l: Long): Long = t.timestamp
                })
        )

        val outputTag = OutputTag[SensorReading]("late-event")

        val minTemp: DataStream[String] = sensorData
            .map(r => {
                val celsius = (r.temperature - 32) * (5.0 / 9.0)
                SensorReading(r.id, r.timestamp, celsius)
            })
            .keyBy(_.id)
            .window(TumblingEventTimeWindows.of(Time.seconds(5)))
            .allowedLateness(Time.seconds(10))
            .sideOutputLateData(outputTag)
            // compute min temperature
            .process(new TemperatureMiner)

        val lateStream: DataStream[SensorReading] = minTemp.getSideOutput(outputTag)
        lateStream.map(r => s"late event: ${r.id}, ${r.timestamp}, ${r.temperature}").print()

        minTemp.print()

        // execute program
        env.execute("Flink Streaming Scala API Skeleton")
    }
}

I am pretty sure that late data are captured because I can see log printing that TemperatureMiner is invoke multiple times for one window, hence late firing.

But the problem is that there is nothing printing by lateStream from side output for late data. Any idea why?

Upvotes: 0

Views: 518

Answers (1)

David Anderson
David Anderson

Reputation: 43524

A side output for late data in a window is only sent data that is so late that it falls outside the allowed lateness. Perhaps none of your late data is late enough.

Upvotes: 1

Related Questions