zavalit
zavalit

Reputation: 307

Flink's CoProcessFunction doesn't trigger onTimer

I try to aggregate two streams like that

val joinedStream = finishResultStream.keyBy(_.searchId)
  .connect(startResultStream.keyBy(_.searchId))
  .process(new SomeCoProcessFunction)

and then working on them in SomeCoProcessFunction class like that

class SomeCoProcessFunction extends CoProcessFunction[SearchFinished, SearchCreated, SearchAggregated] {

   override def processElement1(finished: SearchFinished, ctx: CoProcessFunction[SearchFinished, SearchCreated, SearchAggregated]#Context, out: Collector[SearchAggregated]): Unit = { 

       // aggregating some "finished" data ...

   }

   override def processElement2(created: SearchCreated, ctx: CoProcessFunction[SearchFinished, SearchCreated, SearchAggregated]#Context, out: Collector[SearchAggregated]): Unit = {

       val timerService = ctx.timerService()
       timerService.registerEventTimeTimer(System.currentTimeMillis + 5000)

       // aggregating some "created" data ...
   }

   override def onTimer(timestamp: Long, ctx: CoProcessFunction[SearchFinished, SearchCreated, SearchAggregated]#OnTimerContext, out: Collector[SearchAggregated]): Unit = {

       val watermark: Long = ctx.timerService().currentWatermark()
       println(s"watermark!!!! $watermark")

       // clean up the state

   }

What I want is to clean up the state after a certain time( 5000 Milliseconds), and that is what onTimer have to be used for. But since it never get fired, I kinda ask my self what am I doing wrong here?

Thanks in advance for any hint.

UPDATE:

Solution was to set timeService like that (tnx to both fabian-hueske and Beckham):

timerService.registerProcessingTimeTimer(timerService.currentProcessingTime() + 5000)

I still didn't really figure out what timerService.registerEventTimeTimer does, watermark ctx.timerService().currentWatermark() shows always -9223372036854775808 now matter how long before EventTimer was registered.

Upvotes: 0

Views: 1960

Answers (2)

Fabian Hueske
Fabian Hueske

Reputation: 18987

The problem is that you are registering an event-time timer (timerService.registerEventTimeTimer) with a processing-time timestamp (System.currentTimeMillis + 5000).

System.currentTimeMillis returns the current machine time but event-time is not based on the machine time but on the time computed from watermarks.

Either you should register a processing-timer or register an event-time timer with an event-time timestamp. You can get the timestamp of the current watermark or the timestamp of the current record from the Context object that is passed as a parameter to processElement1() and processElement2().

Upvotes: 2

Alex
Alex

Reputation: 859

I see that you are using System.currentTimeMillis which might be different from the TimeCharacteristic (event time, processing time, ingestion time) that your Flink job uses.

Try getting the timestamp of the event ctx.timestamp() then add the 5000ms on top of it.

Upvotes: 3

Related Questions