Reputation: 13680
I have a use case where I need to handle differently late events vs normal events: if an event arrives after its window has closed, it should be sent to another path.
I thought that .sideOutputLateData(..)
would solve this for me. And it does under normal circumstanses (ie with real world data). But if I want to test it, with fabricated data, it ceases to work.
I expect that something like:
val env = StreamExecutionEnvironment.createLocalEnvironment()
env.setParallelism(1)
val events: DataStream[(Int, Long)] = env.fromElements(
(1, 1),
(1, 15),
(1, 25),
(1, 8) //late Event
)
val lateEvents = OutputTag[(Int, Long)]("lateEvents")
val windowedSum = events
.assignAscendingTimestamps(e => e._2)
.windowAll(TumblingEventTimeWindows.of(time.Time.milliseconds(10)))
.sideOutputLateData(lateEvents)
.sum(position=0)
val lateEventsStream = windowedSum
.getSideOutput(lateEvents)
// Handle differently
.map(e => (e._1 + 100, e._2))
windowedSum.print()
lateEventsStream.print()
// execute program
env.execute("Flink Scala watermarking test")
Would results in:
[info] (1,1)
[info] (1,15)
[info] (1,25)
[info] (101, 8)
Instead I get:
[info] (2,1)
[info] (1,15)
[info] (1,25)
If I use a socketTextStream
as source with the same data, it works as expected.
This tells me that somehow the watermark is not advancing as it should with very fast data inputs.
I tried adjusting setAutoWatermarkInterval
to a very small value, but with no luck.
Am I missing something? How can I test my job?
Upvotes: 1
Views: 524
Reputation: 13680
Thanks to @Dominik Wosinski for poiting me in the right direction. For anyone else lost in the intricancies of the Flink documentation I'll post here my solution:
As suspected the problem was that fast input data didn't advance the watermark. This is because by default Flink will check every 200ms if the watermark should advance. You can shorten this delay (increasing the load to the system) using
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.setAutoWatermarkInterval(10) // or even lower
Evidently this is not enough for a fast input as a 4 element list.
The solution is to emit a watermark at each event (note that this is not recommended in a production environment).
To implement such solution we need to extend the WatermarkGenerator
class:
class MyPunctuatedWatermarkAssigner extends WatermarkGenerator[(Int, Long)] {
override def onEvent(
event: (Int, Long),
eventTimestamp: Long,
output: WatermarkOutput
): Unit = {
// emit at every event
output.emitWatermark(new Watermark(event._2))
}
// do nothing at AutoWatermarkInterval
override def onPeriodicEmit(output: WatermarkOutput): Unit = {}
}
To assign this generator to a stream, we first need to create a WatermarkStrategy
:
class MyStrategy extends WatermarkStrategy[(Int, Long)] {
override def createWatermarkGenerator(
context: WatermarkGeneratorSupplier.Context
): WatermarkGenerator[(Int, Long)] = new MyPunctuatedWatermarkAssigner
}
(This class can also implements an optional createTimestampAssigner
method)
And then we can use it in the stream:
eventsStream
.assignTimestampsAndWatermarks(new MyStrategy())
Upvotes: 1
Reputation: 3864
You can try to use a WatermarkStrategy
that will emit Watermark for every element instead of generating them periodically. You can do this by emitting Watermark inside the onElement
when implementing WatermarkGenerator
as described here. That is the best and most reliable way to use for testing.
Upvotes: 2