koren maliniak
koren maliniak

Reputation: 153

Flink - processing consecutive events within time constraint

I have a use case and I think I need some help on how to approach it. Because I am new to streaming and Flink I will try to be very descriptive in what I am trying to achieve. Sorry if I am not using to formal and correct language.

My code will be in java but I do not care to get code in python or just pseudo code or approach.

TL:DR

  1. Group events of same key that are within some time limit.
  2. Out of those events, create a result event only from the 2 most closest (time domain) events.
  3. This require (I think) opening a window for each and every event that comes.
  4. If you'll look ahead at the batch solution you will understand best my problem.

Background:

  1. I have data coming from sensors as a stream from Kafka.
  2. I need to use eventTime because that data comes unrecorded. The lateness that will give me 90% of events is about 1 minute.
  3. I am grouping those events by some key.

What I want to do:

  1. Depending on some event's fields - I would like to "join/mix" 2 events into a new event ("result event").
  2. The first condition is that those consecutive events are WITHIN 30 seconds from each other.
  3. The next conditions are simply checking some fields values and than deciding.

My psuedo solution:

  1. open a new window for EACH event. That window should be of 1 minute.
  2. For every event that comes within that minute - I want to check it's event time and see if it is 30 seconds from the initial window event. If yes - check for other condition and omit a new result stream.

The Problem - When a new event comes it needs to:

  1. create a new window for itself.
  2. Join only ONE window out of SEVERAL possible windows that are 30 seconds from it.

The question:

Is that possible?

In other words my connection is between two "consecutive" events only.

Thank you very much.

Maybe showing the solution for **BATCH case will show what I am trying to do best:**

for i in range(grouped_events.length):
    event_A = grouped_events[i]
    event_B = grouped_events[i+1]
    if event_B.get("time") - event_A.get("time") < 30:
        if event_B.get("color") == event_A.get("color"):
            if event_B.get("size") > event_A.get("size"):
                create_result_event(event_A, event_B)

My (naive) tries so far with Flink in java

**The sum function is just a place holder for my function to create a new result object...

  1. First solution is just doing a simple time window and summing by some field
  2. Second is trying to do some process function on the window and maybe there iterate throw all events and check for my conditions?

    DataStream
    .keyBy(threeEvent -> threeEvent.getUserId())
    .window(TumblingEventTimeWindows.of(Time.seconds(60)))
    .sum("size")
    .print();
    
    
    
    DataStream
    .keyBy(threeEvent -> threeEvent.getUserId())
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .process(new processFunction());
    
    
    
    public static class processFunction extends ProcessWindowFunction<ThreeEvent, Tuple3<Long, Long, Float>, Long, TimeWindow> {
        @Override
        public void process(Long key, Context context, Iterable<ThreeEvent> threeEvents, Collector<Tuple3<Long, Long, Float>> out) throws Exception {
            Float sumOfSize = 0F;
            for (ThreeEvent f : threeEvents) {
                sumOfSize += f.getSize();
            }
    
            out.collect(new Tuple3<>(context.window().getEnd(), key, sumOfTips));
        }
    }
    

Upvotes: 1

Views: 1333

Answers (1)

David Anderson
David Anderson

Reputation: 43499

You can, of course, use windows to create mini-batches that you sort and analyze, but it will be difficult to handle the window boundaries correctly (what if the events that should be paired land in different windows?).

This looks like it would be much more easily done with a keyed stream and a stateful flatmap. Just use a RichFlatMapFunction and use one piece of keyed state (a ValueState) that remembers the previous event for each key. Then as each event is processed, compare it to the saved event, produce a result if that should happen, and update the state.

You can read about working with flink's keyed state in the flink training and in the flink documentation.

The one thing that concerns me about your use case is whether or not your events may arrive out-of-order. Is it the case that to get correct results you would need to first sort the events by timestamp? That isn't trivial. If this is a concern, then I would suggest that you use Flink SQL with MATCH_RECOGNIZE, or the CEP library, both of which are designed for doing pattern recognition on event streams, and will take care of sorting the stream for you (you just have to provide timestamps and watermarks).

This query may not be exactly right, but hopefully conveys the flavor of how to do something like this with match recognize:

SELECT * FROM Events
MATCH_RECOGNIZE (
  PARTITION BY userId
  ORDER BY eventTime
  MEASURES
    A.userId as userId,
    A.color as color,
    A.size as aSize,
    B.size as bSize
  AFTER MATCH SKIP PAST LAST ROW
  PATTERN (A B)
  DEFINE
    A AS true,
    B AS ( timestampDiff(SECOND, A.eventTime, B.eventTime) < 30) 
           AND A.color = B.color 
           AND A.size < B.size )
);

This can also be done quite naturally with CEP, where the basis for comparing consecutive events is to use an iterative condition, and you can use a within clause to handle the time constraint.

Upvotes: 1

Related Questions