ngibanel
ngibanel

Reputation: 93

Flink - Emit last window when there are no following events

I have a stream containing events Event(Id, Type, Date) and I want to process those events grouped by (Id, Type) and session of activity


For instance, from events

Event1: Event(1, T1, 2018-01-24)
Event2: Event(2, T1, 2018-01-26)
Event3: Event(1, T2, 2018-01-28)
Event4: Event(1, T2, 2018-01-28)
...

I expect to have the following windows :

Window1 with Event1
Window2 with Event2
Window3 with Event3 and Event4
...

From my understanding, I should be able to do that with event-time session windows on keyed stream. But with my code, only the first window (Window1) containing the first event (Event1) is printed.

environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

environment
    .addSource(kafkaConsumer.setStartFromEarliest())
    .assignTimestampsAndWatermarks(<timestamp assigner>)
    .keyBy(e => (e.getId, e.getType))
    .window(EventTimeSessionWindows.withGap(Time.days(1)))
    .apply(new WindowFunction[Event, String, (String, String), TimeWindow]() {
          override def apply(key: (String, String), window: TimeWindow, input: Iterable[Event], out: Collector[String]): Unit = {
            var count = 0L
            for (in <- input) {
              count = count + 1
            }
            out.collect(s"Window $window count: $count")
          }
        })
  .print()

Is it the appropriate way of dealing with historical events and session windows ?

Upvotes: 6

Views: 1880

Answers (2)

杨李思
杨李思

Reputation: 21

org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows is what you want. Different from EventTimeSessionWindows, ProcessingTimeSessionWindows triggered immediately after the Gap time passed, without waiting for next event come.

Upvotes: 1

Dawid Wysakowicz
Dawid Wysakowicz

Reputation: 3422

The problem in your case is that Watermark is always generated based on incoming events. If there is no incoming events than the Watermark does not progress. In your example only the Window1 is emitted because only for Event1 there is another following event with timestamp that advances Watermark beyond the session gap. For the remaining three events there is no such elements. For event3 and event4 there are no such events at all. Also because the stream is keyed elements with different keys are processed independently. Watermark does not advances in this case and therefore the windows are not emitted.

Upvotes: 2

Related Questions