Reputation: 977
My input stream
type=1, time=10, start=123, other params
type=2, time=11, start=123, other params
type=2, time=12, start=123, other params
type=1, time=13, start=235, other params
type=2, time=14, start=123, other params
type=2, time=15, start=235, other params
type=2, time=16, start=235, other params
type=1, time=17, start=456, other params
...
I want to create a window starting with type=1 event. After that, I have type=2 event continuously until key start=123 stops.
Type=1 event is similar to start-event, type=2 event is similar to ping event to signify that the producer is still alive. I have them in 2 separate topics.
I have an idea about creating a custom session window which starts when type=1 event happens, that window is open until it is more than 3 mins from the last type=2 event.
stream
.keyBy(start)
.window(CustomWindow())
.trigger(CustomTrigger())
...
However, I don't know how to create a custom window that only starts when receiving event type=1. I read about Trigger and it is about when the window function should fire, not when to create a Window.
Expected result:
type=event-end, start=123, duration=3 (because there are 3 type=2 log for 123)
-> this fires at time=17 because last ping event is at time=14, there is a gap of 3.
type=event-end, start=235, duration=2 (because there are 3 type=2 log for 123)
-> this fires at time=19 because last ping event is at time=16, there is a gap of 3 and if there is no more ping after time=16.
How do I implement this custom window in Flink?
Upvotes: 1
Views: 1923
Reputation: 1009
I believe WindowAssigner.java is what you want. Define your own logic of assigning and triggering of the window.
Upvotes: 2