Reputation: 31
is there any way to control the processing time to trigger the timer when testing flink jobs with MiniClusterWithClientResource?
I'm able to test both the methods of the KeyedCoProcessFunction i.e. processElement()... triggering timer callback i.e onTimer()... in the Unit Tests using a testharness and controlling the processing time i.e:
//trigger processing time timer by advancing the processing time of the operator directly testHarness.setProcessingTime(300000)
Thus. I can trigger the timer at the specified time
however, what I need now is trigger the timer in a end to end flink job test using minicluster MiniClusterWithClientResource
val flinkCluster = new MiniClusterWithClientResource... and be able to advance the processing time to fire the onTimer method
Upvotes: 0
Views: 1068
Reputation: 31
adding a Thread.sleep(1000) one second in the SourceFunction class after all messages are sent solved the problem.
class MySourceFunction() extends RichParallelSourceFunction[]{
...
//is a one-time Delay after all messages have been sent
Thread.sleep(1000)
}
Upvotes: 3