Reputation: 761
My use-case is quite simple I receive events that contain "event timestamp", and want them to be aggregated based on event time. and the output is a periodical processing time tumbling window every 10min.
More specific, the stream of data that is keyed and need to compute counts for 7 seconds.
I am not able to integration test it (i.e., similar to unit test but an end-to-end testing) as the input has fake event time, which won't trigger
Here is my snippet
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val oneDayCounts = data
.map(t => (t.key1, t.key2, 1L, t.timestampMs))
.keyBy(0, 1)
.timeWindow(Time.seconds(1))
.sum(2)
val sevenDayCounts = oneDayCounts
.keyBy(0,1)
.timeWindow(Time.seconds(3), Time.seconds(1))
.sum(2)
// single reducer
sevenDayCounts
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.process(...)
I use EventTime as timestamp and set up an integration test code with MiniClusterWithClientResource
. also created some fake data with some event timestamp like 1234l, 4567l, etc.
EventTimeTrigger is able to be fired for sum computation but the following TumblingProcessingTimeWindow is not able to trigger. I had a Thread.sleep of 30s in the IT test code but still not triggered after the 30s
Upvotes: 0
Views: 2335
Reputation: 43717
In general it's a challenge to write meaningful tests for processing time windows, since they are inherently non-deterministic. This is one reason why event time windows are generally prefered.
It's also going to be difficult to put a sleep in the right place so that is has the desired effect. But one way to keep the job running long enough for the processing time window to fire would be to use a custom source that includes a sleep. Flink streaming jobs with finite sources shut themselves down once the input has been exhausted. One final watermark with the value MAX_WATERMARK gets sent through the pipeline, which triggers all event time windows, but processing time windows are only fired if they are still running when the appointed time arrives.
See this answer for an example of a hack that works around this.
Alternatively, you might take a look at https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java to see how processing time windows can be tested by mocking getCurrentProcessingTime
.
Upvotes: 1