Reputation: 307
I try to aggregate two streams like that
val joinedStream = finishResultStream.keyBy(_.searchId)
.connect(startResultStream.keyBy(_.searchId))
.process(new SomeCoProcessFunction)
and then working on them in SomeCoProcessFunction
class like that
class SomeCoProcessFunction extends CoProcessFunction[SearchFinished, SearchCreated, SearchAggregated] {
override def processElement1(finished: SearchFinished, ctx: CoProcessFunction[SearchFinished, SearchCreated, SearchAggregated]#Context, out: Collector[SearchAggregated]): Unit = {
// aggregating some "finished" data ...
}
override def processElement2(created: SearchCreated, ctx: CoProcessFunction[SearchFinished, SearchCreated, SearchAggregated]#Context, out: Collector[SearchAggregated]): Unit = {
val timerService = ctx.timerService()
timerService.registerEventTimeTimer(System.currentTimeMillis + 5000)
// aggregating some "created" data ...
}
override def onTimer(timestamp: Long, ctx: CoProcessFunction[SearchFinished, SearchCreated, SearchAggregated]#OnTimerContext, out: Collector[SearchAggregated]): Unit = {
val watermark: Long = ctx.timerService().currentWatermark()
println(s"watermark!!!! $watermark")
// clean up the state
}
What I want is to clean up the state after a certain time( 5000 Milliseconds), and that is what onTimer
have to be used for. But since it never get fired, I kinda ask my self what am I doing wrong here?
Thanks in advance for any hint.
UPDATE:
Solution was to set timeService like that (tnx to both fabian-hueske and Beckham):
timerService.registerProcessingTimeTimer(timerService.currentProcessingTime() + 5000)
I still didn't really figure out what timerService.registerEventTimeTimer
does, watermark ctx.timerService().currentWatermark()
shows always -9223372036854775808
now matter how long before EventTimer was registered.
Upvotes: 0
Views: 1960
Reputation: 18987
The problem is that you are registering an event-time timer (timerService.registerEventTimeTimer
) with a processing-time timestamp (System.currentTimeMillis + 5000
).
System.currentTimeMillis
returns the current machine time but event-time is not based on the machine time but on the time computed from watermarks.
Either you should register a processing-timer or register an event-time timer with an event-time timestamp. You can get the timestamp of the current watermark or the timestamp of the current record from the Context
object that is passed as a parameter to processElement1()
and processElement2()
.
Upvotes: 2
Reputation: 859
I see that you are using System.currentTimeMillis
which might be different from the TimeCharacteristic
(event time, processing time, ingestion time) that your Flink job uses.
Try getting the timestamp of the event ctx.timestamp()
then add the 5000ms on top of it.
Upvotes: 3