user3497321
user3497321

Reputation: 523

Flink - Creating TumblingEventTimeWindows on event time

I'm trying to create TumblingEventTimeWindows for event time from kafka where records are expected every 1 min. Instead of each minute granularity, I want to down-sample the same for some time frame and I'm trying the below:

val strategy = WatermarkStrategy.forBoundedOutOfOrderness<POJO>(Duration.ofSeconds(20))
     .withTimestampAssigner{ event, timestamp -> event.timestamp }

val source = env.fromSource(myKafkaSource, strategy, "MyOwnKafkaSource")

val streamingSource = source
    .flatMap(<MyMapperFunction>)
    .keyBy { x -> "${x.eventId}_${x.eventName}" }
    .window(TumblingEventTimeWindows.of(<My required sample size>))
    .process(<My process function>)

This way, I want to consume from the kafka topic. TumblingEventTimeWindows is not resulting anything and so I don't see any output in process function. If I use TumblingProcessingTimeWindows, everything works fine.

Do I need to add anything else here? or shall I increase the duration for watermark?

Can someone please help me on what I'm missing?

Thanks in advance.

Upvotes: 2

Views: 841

Answers (1)

David Anderson
David Anderson

Reputation: 43499

If the window isn't producing the expected results, then the watermarks aren't becoming large enough to trigger the window. This can be caused by

  • empty or idle kafka partitions (can be solved by using the withIdleness option on the watermark strategy)
  • some or all of the kafka partitions don't have input events with sufficiently large timestamps (can be solved by supplying more data with larger timestamps)

You are going to need events with timestamps 20 seconds into the next window in order to trigger the previous window (given how the watermark generator is configured).

You can examine the current watermarks in the Flink web UI to check if they are indeed too small to trigger the window.

Upvotes: 3

Related Questions