Reputation: 23
Even though Flink has some built-in tooling to handle late data, like allowed lateness, I'd like to handle the late data by myself. For example, I'd like to monitor late event or just save them to a database.
How can I do that?
Upvotes: 1
Views: 442
Reputation: 18997
ProcessFunctions (ProcessFunction
, KeyedProcessFunction
, etc.) provide access to the event timestamp of a record and the TimerService
via a Context
object. The TimerService
gives access to the current watermark.
You can identify late records by comparing the event timestamp and the watermark. If the timestamp is less or equal to the watermark, the event is late.
It is up to you how you want to handle late events. You can mark them, you can discard them, emit them via a side output, or perform any kind of computation with them.
Upvotes: 0
Reputation: 1009
Usually the lateness and watermarks are used in window operators. And If you're using window operator, you can use the sideoutput like this:
val windowStream = eventStream.keyBy(output => output.rule)
.window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.MINUTES)))
.sideOutputLateData(lateOutputTag)
And get the late elements from the sideoutput like this:
windowStream.getSideOutput(lateOutputTag).print()
Upvotes: 1