GGamba
GGamba

Reputation: 13680

Flink windows and late events

I have a use case where I need to handle differently late events vs normal events: if an event arrives after its window has closed, it should be sent to another path.

I thought that .sideOutputLateData(..) would solve this for me. And it does under normal circumstanses (ie with real world data). But if I want to test it, with fabricated data, it ceases to work.

I expect that something like:

val env = StreamExecutionEnvironment.createLocalEnvironment()
env.setParallelism(1)

val events: DataStream[(Int, Long)] = env.fromElements(
  (1, 1),
  (1, 15),
  (1, 25),
  (1, 8) //late Event
)
val lateEvents = OutputTag[(Int, Long)]("lateEvents")

val windowedSum = events
  .assignAscendingTimestamps(e => e._2)
  .windowAll(TumblingEventTimeWindows.of(time.Time.milliseconds(10)))
  .sideOutputLateData(lateEvents)
  .sum(position=0)

val lateEventsStream = windowedSum
  .getSideOutput(lateEvents)
  // Handle differently
  .map(e => (e._1 + 100, e._2))

windowedSum.print()
lateEventsStream.print()
// execute program
env.execute("Flink Scala watermarking test")

Would results in:

[info] (1,1)
[info] (1,15)
[info] (1,25)
[info] (101, 8)

Instead I get:

[info] (2,1)
[info] (1,15)
[info] (1,25)

If I use a socketTextStream as source with the same data, it works as expected.

This tells me that somehow the watermark is not advancing as it should with very fast data inputs. I tried adjusting setAutoWatermarkInterval to a very small value, but with no luck.

Am I missing something? How can I test my job?

Upvotes: 1

Views: 524

Answers (2)

GGamba
GGamba

Reputation: 13680

Thanks to @Dominik Wosinski for poiting me in the right direction. For anyone else lost in the intricancies of the Flink documentation I'll post here my solution:

As suspected the problem was that fast input data didn't advance the watermark. This is because by default Flink will check every 200ms if the watermark should advance. You can shorten this delay (increasing the load to the system) using

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.setAutoWatermarkInterval(10) // or even lower

Evidently this is not enough for a fast input as a 4 element list.

The solution is to emit a watermark at each event (note that this is not recommended in a production environment).

To implement such solution we need to extend the WatermarkGenerator class:

class MyPunctuatedWatermarkAssigner extends WatermarkGenerator[(Int, Long)] {

  override def onEvent(
      event: (Int, Long),
      eventTimestamp: Long,
      output: WatermarkOutput
  ): Unit = {
    // emit at every event
    output.emitWatermark(new Watermark(event._2))
  }

  // do nothing at AutoWatermarkInterval
  override def onPeriodicEmit(output: WatermarkOutput): Unit = {}
}

To assign this generator to a stream, we first need to create a WatermarkStrategy:

class MyStrategy extends WatermarkStrategy[(Int, Long)] {
  override def createWatermarkGenerator(
      context: WatermarkGeneratorSupplier.Context
  ): WatermarkGenerator[(Int, Long)] = new MyPunctuatedWatermarkAssigner
}

(This class can also implements an optional createTimestampAssigner method)

And then we can use it in the stream:

eventsStream
   .assignTimestampsAndWatermarks(new MyStrategy())

Upvotes: 1

Dominik Wosiński
Dominik Wosiński

Reputation: 3864

You can try to use a WatermarkStrategy that will emit Watermark for every element instead of generating them periodically. You can do this by emitting Watermark inside the onElement when implementing WatermarkGenerator as described here. That is the best and most reliable way to use for testing.

Upvotes: 2

Related Questions