Reputation: 153
I have a use case and I think I need some help on how to approach it. Because I am new to streaming and Flink I will try to be very descriptive in what I am trying to achieve. Sorry if I am not using to formal and correct language.
My code will be in java but I do not care to get code in python or just pseudo code or approach.
TL:DR
Background:
What I want to do:
My psuedo solution:
The Problem - When a new event comes it needs to:
The question:
Is that possible?
In other words my connection is between two "consecutive" events only.
Thank you very much.
Maybe showing the solution for **BATCH case will show what I am trying to do best:**
for i in range(grouped_events.length):
event_A = grouped_events[i]
event_B = grouped_events[i+1]
if event_B.get("time") - event_A.get("time") < 30:
if event_B.get("color") == event_A.get("color"):
if event_B.get("size") > event_A.get("size"):
create_result_event(event_A, event_B)
My (naive) tries so far with Flink in java
**The sum function is just a place holder for my function to create a new result object...
Second is trying to do some process function on the window and maybe there iterate throw all events and check for my conditions?
DataStream
.keyBy(threeEvent -> threeEvent.getUserId())
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.sum("size")
.print();
DataStream
.keyBy(threeEvent -> threeEvent.getUserId())
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new processFunction());
public static class processFunction extends ProcessWindowFunction<ThreeEvent, Tuple3<Long, Long, Float>, Long, TimeWindow> {
@Override
public void process(Long key, Context context, Iterable<ThreeEvent> threeEvents, Collector<Tuple3<Long, Long, Float>> out) throws Exception {
Float sumOfSize = 0F;
for (ThreeEvent f : threeEvents) {
sumOfSize += f.getSize();
}
out.collect(new Tuple3<>(context.window().getEnd(), key, sumOfTips));
}
}
Upvotes: 1
Views: 1333
Reputation: 43499
You can, of course, use windows to create mini-batches that you sort and analyze, but it will be difficult to handle the window boundaries correctly (what if the events that should be paired land in different windows?).
This looks like it would be much more easily done with a keyed stream and a stateful flatmap. Just use a RichFlatMapFunction and use one piece of keyed state (a ValueState) that remembers the previous event for each key. Then as each event is processed, compare it to the saved event, produce a result if that should happen, and update the state.
You can read about working with flink's keyed state in the flink training and in the flink documentation.
The one thing that concerns me about your use case is whether or not your events may arrive out-of-order. Is it the case that to get correct results you would need to first sort the events by timestamp? That isn't trivial. If this is a concern, then I would suggest that you use Flink SQL with MATCH_RECOGNIZE, or the CEP library, both of which are designed for doing pattern recognition on event streams, and will take care of sorting the stream for you (you just have to provide timestamps and watermarks).
This query may not be exactly right, but hopefully conveys the flavor of how to do something like this with match recognize:
SELECT * FROM Events
MATCH_RECOGNIZE (
PARTITION BY userId
ORDER BY eventTime
MEASURES
A.userId as userId,
A.color as color,
A.size as aSize,
B.size as bSize
AFTER MATCH SKIP PAST LAST ROW
PATTERN (A B)
DEFINE
A AS true,
B AS ( timestampDiff(SECOND, A.eventTime, B.eventTime) < 30)
AND A.color = B.color
AND A.size < B.size )
);
This can also be done quite naturally with CEP, where the basis for comparing consecutive events is to use an iterative condition, and you can use a within
clause to handle the time constraint.
Upvotes: 1