Vojtech Letal
Vojtech Letal

Reputation: 3098

Sorting union of streams to identify user sessions in Apache Flink

I have two streams of events

the lower index represents a timestamp... If we joined the two streams together and sorted them by time we would get:

Would it be possible to implement custom Window / Trigger functions to group the event to sessions (time between logins of different users):

The problem which I see is that the two streams are not necessarily sorted. I thought about sorting the input stream by time-stamps. Then it would be easy to implement the windowing using GlobalWindow and custom Trigger - yet it seems that it is not possible.

Am I missing something or is it definitely not possible to do so in current Flink (v1.3.2)?

Thanks

Upvotes: 1

Views: 1174

Answers (1)

David Anderson
David Anderson

Reputation: 43514

Question: shouldn't E3 come before L4?

Sorting is pretty straightforward using a ProcessFunction. Something like this:

public static class SortFunction extends ProcessFunction<Event, Event> {
  private ValueState<PriorityQueue<Event>> queueState = null;

  @Override
  public void open(Configuration config) {
    ValueStateDescriptor<PriorityQueue<Event>> descriptor = new ValueStateDescriptor<>(
        // state name
        "sorted-events",
        // type information of state
        TypeInformation.of(new TypeHint<PriorityQueue<Event>>() {
        }));
    queueState = getRuntimeContext().getState(descriptor);
  }

  @Override
  public void processElement(Event event, Context context, Collector<Event> out) throws Exception {
    TimerService timerService = context.timerService();

    if (context.timestamp() > timerService.currentWatermark()) {
      PriorityQueue<Event> queue = queueState.value();
      if (queue == null) {
        queue = new PriorityQueue<>(10);
      }
      queue.add(event);
      queueState.update(queue);
      timerService.registerEventTimeTimer(event.timestamp);
    }
  }

  @Override
  public void onTimer(long timestamp, OnTimerContext context, Collector<Event> out) throws Exception {
    PriorityQueue<Event> queue = queueState.value();
    Long watermark = context.timerService().currentWatermark();
    Event head = queue.peek();
    while (head != null && head.timestamp <= watermark) {
      out.collect(head);
      queue.remove(head);
      head = queue.peek();
    }
  }
}

Update: see How to sort an out-of-order event time stream using Flink for a description of a generally better approach.

Upvotes: 4

Related Questions