Yuting Gao
Yuting Gao

Reputation: 23

Apache Flink: How to apply custom logic to late events?

Even though Flink has some built-in tooling to handle late data, like allowed lateness, I'd like to handle the late data by myself. For example, I'd like to monitor late event or just save them to a database.

How can I do that?

Upvotes: 1

Views: 442

Answers (2)

Fabian Hueske
Fabian Hueske

Reputation: 18997

ProcessFunctions (ProcessFunction, KeyedProcessFunction, etc.) provide access to the event timestamp of a record and the TimerService via a Context object. The TimerService gives access to the current watermark.

You can identify late records by comparing the event timestamp and the watermark. If the timestamp is less or equal to the watermark, the event is late.

It is up to you how you want to handle late events. You can mark them, you can discard them, emit them via a side output, or perform any kind of computation with them.

Upvotes: 0

Jiayi Liao
Jiayi Liao

Reputation: 1009

Usually the lateness and watermarks are used in window operators. And If you're using window operator, you can use the sideoutput like this:

val windowStream = eventStream.keyBy(output => output.rule)
  .window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.MINUTES)))
  .sideOutputLateData(lateOutputTag)

And get the late elements from the sideoutput like this:

windowStream.getSideOutput(lateOutputTag).print()

Upvotes: 1

Related Questions