Fernando Chong
Fernando Chong

Reputation: 31

testing flink jobs with MiniCluster to trigger the timer using processing time

is there any way to control the processing time to trigger the timer when testing flink jobs with MiniClusterWithClientResource?

I'm able to test both the methods of the KeyedCoProcessFunction i.e. processElement()... triggering timer callback i.e onTimer()... in the Unit Tests using a testharness and controlling the processing time i.e:

//trigger processing time timer by advancing the processing time of the operator directly testHarness.setProcessingTime(300000)

Thus. I can trigger the timer at the specified time

however, what I need now is trigger the timer in a end to end flink job test using minicluster MiniClusterWithClientResource

val flinkCluster = new MiniClusterWithClientResource... and be able to advance the processing time to fire the onTimer method

Upvotes: 0

Views: 1068

Answers (1)

Fernando Chong
Fernando Chong

Reputation: 31

adding a Thread.sleep(1000) one second in the SourceFunction class after all messages are sent solved the problem.

class MySourceFunction() extends RichParallelSourceFunction[]{
...

//is a one-time Delay after all messages have been sent
Thread.sleep(1000)
}

Upvotes: 3

Related Questions