Reputation: 523
I'm trying to create TumblingEventTimeWindows
for event time from kafka where records are expected every 1 min. Instead of each minute granularity, I want to down-sample the same for some time frame and I'm trying the below:
val strategy = WatermarkStrategy.forBoundedOutOfOrderness<POJO>(Duration.ofSeconds(20))
.withTimestampAssigner{ event, timestamp -> event.timestamp }
val source = env.fromSource(myKafkaSource, strategy, "MyOwnKafkaSource")
val streamingSource = source
.flatMap(<MyMapperFunction>)
.keyBy { x -> "${x.eventId}_${x.eventName}" }
.window(TumblingEventTimeWindows.of(<My required sample size>))
.process(<My process function>)
This way, I want to consume from the kafka topic. TumblingEventTimeWindows is not resulting anything and so I don't see any output in process function. If I use TumblingProcessingTimeWindows
, everything works fine.
Do I need to add anything else here? or shall I increase the duration for watermark?
Can someone please help me on what I'm missing?
Thanks in advance.
Upvotes: 2
Views: 841
Reputation: 43499
If the window isn't producing the expected results, then the watermarks aren't becoming large enough to trigger the window. This can be caused by
withIdleness
option on the watermark strategy)You are going to need events with timestamps 20 seconds into the next window in order to trigger the previous window (given how the watermark generator is configured).
You can examine the current watermarks in the Flink web UI to check if they are indeed too small to trigger the window.
Upvotes: 3