Reputation: 93
I have a stream containing events Event(Id, Type, Date)
and I want to process those events grouped by (Id, Type) and session of activity
For instance, from events
Event1: Event(1, T1, 2018-01-24)
Event2: Event(2, T1, 2018-01-26)
Event3: Event(1, T2, 2018-01-28)
Event4: Event(1, T2, 2018-01-28)
...
I expect to have the following windows :
Window1 with Event1
Window2 with Event2
Window3 with Event3 and Event4
...
From my understanding, I should be able to do that with event-time session windows on keyed stream. But with my code, only the first window (Window1) containing the first event (Event1) is printed.
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
environment
.addSource(kafkaConsumer.setStartFromEarliest())
.assignTimestampsAndWatermarks(<timestamp assigner>)
.keyBy(e => (e.getId, e.getType))
.window(EventTimeSessionWindows.withGap(Time.days(1)))
.apply(new WindowFunction[Event, String, (String, String), TimeWindow]() {
override def apply(key: (String, String), window: TimeWindow, input: Iterable[Event], out: Collector[String]): Unit = {
var count = 0L
for (in <- input) {
count = count + 1
}
out.collect(s"Window $window count: $count")
}
})
.print()
Is it the appropriate way of dealing with historical events and session windows ?
Upvotes: 6
Views: 1880
Reputation: 21
org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows
is what you want. Different from EventTimeSessionWindows
, ProcessingTimeSessionWindows
triggered immediately after the Gap
time passed, without waiting for next event come.
Upvotes: 1
Reputation: 3422
The problem in your case is that Watermark is always generated based on incoming events. If there is no incoming events than the Watermark does not progress. In your example only the Window1 is emitted because only for Event1 there is another following event with timestamp that advances Watermark beyond the session gap. For the remaining three events there is no such elements. For event3 and event4 there are no such events at all. Also because the stream is keyed elements with different keys are processed independently. Watermark does not advances in this case and therefore the windows are not emitted.
Upvotes: 2