Son
Son

Reputation: 977

Flink - How to implement custom session window that creates window on specific event and triggers after some session time?

My input stream

type=1, time=10, start=123, other params
type=2, time=11, start=123, other params
type=2, time=12, start=123, other params
type=1, time=13, start=235, other params
type=2, time=14, start=123, other params
type=2, time=15, start=235, other params
type=2, time=16, start=235, other params
type=1, time=17, start=456, other params
...

I want to create a window starting with type=1 event. After that, I have type=2 event continuously until key start=123 stops.

Type=1 event is similar to start-event, type=2 event is similar to ping event to signify that the producer is still alive. I have them in 2 separate topics.

I have an idea about creating a custom session window which starts when type=1 event happens, that window is open until it is more than 3 mins from the last type=2 event.

stream
  .keyBy(start)
  .window(CustomWindow())
  .trigger(CustomTrigger())
  ...

However, I don't know how to create a custom window that only starts when receiving event type=1. I read about Trigger and it is about when the window function should fire, not when to create a Window.

Expected result:

type=event-end, start=123, duration=3 (because there are 3 type=2 log for 123)
-> this fires at time=17 because last ping event is at time=14, there is a gap of 3.
type=event-end, start=235, duration=2 (because there are 3 type=2 log for 123)
-> this fires at time=19 because last ping event is at time=16, there is a gap of 3 and if there is no more ping after time=16.

How do I implement this custom window in Flink?

Upvotes: 1

Views: 1923

Answers (1)

Jiayi Liao
Jiayi Liao

Reputation: 1009

I believe WindowAssigner.java is what you want. Define your own logic of assigning and triggering of the window.

Upvotes: 2

Related Questions