Reputation: 1144
This is my application code
object StreamingJob {
def main(args: Array[String]) {
// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// define EventTime and Watermark
var sensorData: DataStream[SensorReading] = env.addSource(new SensorSource).assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[SensorReading](Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner[SensorReading] {
override def extractTimestamp(t: SensorReading, l: Long): Long = t.timestamp
})
)
val outputTag = OutputTag[SensorReading]("late-event")
val minTemp: DataStream[String] = sensorData
.map(r => {
val celsius = (r.temperature - 32) * (5.0 / 9.0)
SensorReading(r.id, r.timestamp, celsius)
})
.keyBy(_.id)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(10))
.sideOutputLateData(outputTag)
// compute min temperature
.process(new TemperatureMiner)
val lateStream: DataStream[SensorReading] = minTemp.getSideOutput(outputTag)
lateStream.map(r => s"late event: ${r.id}, ${r.timestamp}, ${r.temperature}").print()
minTemp.print()
// execute program
env.execute("Flink Streaming Scala API Skeleton")
}
}
I am pretty sure that late data are captured because I can see log printing that TemperatureMiner
is invoke multiple times for one window, hence late firing.
But the problem is that there is nothing printing by lateStream from side output for late data. Any idea why?
Upvotes: 0
Views: 518
Reputation: 43524
A side output for late data in a window is only sent data that is so late that it falls outside the allowed lateness. Perhaps none of your late data is late enough.
Upvotes: 1