Justin Lin
Justin Lin

Reputation: 761

TumblingProcessingTimeWindows processing with event time characteristic is not triggered

My use-case is quite simple I receive events that contain "event timestamp", and want them to be aggregated based on event time. and the output is a periodical processing time tumbling window every 10min.

More specific, the stream of data that is keyed and need to compute counts for 7 seconds.

  1. a tumbling window of 1 second
  2. a sliding window for counting 7 seconds with an advance of 1 second
  3. a windowall to output all counts every 1s

I am not able to integration test it (i.e., similar to unit test but an end-to-end testing) as the input has fake event time, which won't trigger

Here is my snippet

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val oneDayCounts = data
  .map(t => (t.key1, t.key2, 1L, t.timestampMs))
  .keyBy(0, 1)
  .timeWindow(Time.seconds(1))
  .sum(2)

val sevenDayCounts = oneDayCounts
  .keyBy(0,1)
  .timeWindow(Time.seconds(3), Time.seconds(1))
  .sum(2)

// single reducer
sevenDayCounts
  .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
  .process(...)

I use EventTime as timestamp and set up an integration test code with MiniClusterWithClientResource. also created some fake data with some event timestamp like 1234l, 4567l, etc.

EventTimeTrigger is able to be fired for sum computation but the following TumblingProcessingTimeWindow is not able to trigger. I had a Thread.sleep of 30s in the IT test code but still not triggered after the 30s

Upvotes: 0

Views: 2335

Answers (1)

David Anderson
David Anderson

Reputation: 43717

In general it's a challenge to write meaningful tests for processing time windows, since they are inherently non-deterministic. This is one reason why event time windows are generally prefered.

It's also going to be difficult to put a sleep in the right place so that is has the desired effect. But one way to keep the job running long enough for the processing time window to fire would be to use a custom source that includes a sleep. Flink streaming jobs with finite sources shut themselves down once the input has been exhausted. One final watermark with the value MAX_WATERMARK gets sent through the pipeline, which triggers all event time windows, but processing time windows are only fired if they are still running when the appointed time arrives.

See this answer for an example of a hack that works around this.

Alternatively, you might take a look at https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java to see how processing time windows can be tested by mocking getCurrentProcessingTime.

Upvotes: 1

Related Questions