Reputation: 184
I am getting messages from a kafka topic which is sending me a JSON message. I would like to extract a field from that json message (which can be for ex. an ID) and I would like to create 'n' sessions for 'n' unique device IDs.
I have tried creating a new session instance for every unique ID that I am receiving, but after creating new session window instance i.e. creating a new branch in the pipeline for each IDs, I am unable to push the next upcoming messages to the corresponding branch which already exists.
The expected result that I want is, suppose we are getting messages like
{ID:1,...}, {ID:2,...}, {ID:3,...},{ID:1,...}
There would be three different sessions created and the fourth message would go to the session for device ID 1. Is there a way to do this in apache beam programming paradigm or in Java Programming Paradigm ? Any help would be greatly appreciated.
Upvotes: 1
Views: 663
Reputation: 7058
I have implemented Java and Python examples for this use case. The Java one follows the approach suggested by Daniel Oliveira but I think it's nice to share a working sample.
Note that the Java example is featured in the Beam common patterns docs. Custom merging windows isn't supported in Python (with fnapi).
We can adapt the code from Session windows to fit our use case.
Briefly, when records are windowed into sessions they get assigned to a window that begins at the element’s timestamp (unaligned windows) and adds the gap duration to the start to calculate the end. The mergeWindows
function will then combine all overlapping windows per key resulting in an extended session.
We’ll need to modify the assignWindows
function to create a window with a data-driven gap instead of a fixed duration. We can access the element through WindowFn.AssignContext.element()
. The original assignment function is:
public Collection<IntervalWindow> assignWindows(AssignContext c) {
// Assign each element into a window from its timestamp until gapDuration in the
// future. Overlapping windows (representing elements within gapDuration of
// each other) will be merged.
return Arrays.asList(new IntervalWindow(c.timestamp(), gapDuration));
}
The modified function will be:
@Override
public Collection<IntervalWindow> assignWindows(AssignContext c) {
// Assign each element into a window from its timestamp until gapDuration in the
// future. Overlapping windows (representing elements within gapDuration of
// each other) will be merged.
Duration dataDrivenGap;
JSONObject message = new JSONObject(c.element().toString());
try {
dataDrivenGap = Duration.standardSeconds(Long.parseLong(message.getString(gapAttribute)));
}
catch(Exception e) {
dataDrivenGap = gapDuration;
}
return Arrays.asList(new IntervalWindow(c.timestamp(), dataDrivenGap));
}
Note that we have added a couple extra things:
The withDefaultGapDuration
and withGapAttribute
methods are:
/** Creates a {@code DynamicSessions} {@link WindowFn} with the specified gap duration. */
public static DynamicSessions withDefaultGapDuration(Duration gapDuration) {
return new DynamicSessions(gapDuration, "");
}
public DynamicSessions withGapAttribute(String gapAttribute) {
return new DynamicSessions(gapDuration, gapAttribute);
}
We will also add a new field (gapAttribute
) and constructor:
public class DynamicSessions extends WindowFn<Object, IntervalWindow> {
/** Duration of the gaps between sessions. */
private final Duration gapDuration;
/** Pub/Sub attribute that modifies session gap. */
private final String gapAttribute;
/** Creates a {@code DynamicSessions} {@link WindowFn} with the specified gap duration. */
private DynamicSessions(Duration gapDuration, String gapAttribute) {
this.gapDuration = gapDuration;
this.gapAttribute = gapAttribute;
}
Then, we can window our messages into the new custom sessions with:
.apply("Window into sessions", Window.<String>into(DynamicSessions
.withDefaultGapDuration(Duration.standardSeconds(10))
.withGapAttribute("gap"))
In order to test this we’ll use a simple example with a controlled input. For our use case we'll consider different needs for our users depending on the device where the app is running. Desktop users can be idle for long (allowing for longer sessions) whereas we only expect short-span sessions for our mobile users. We generate some mock data, where some messages contain the gap
attribute and others omit it (gap window will resort to default for these ones):
.apply("Create data", Create.timestamped(
TimestampedValue.of("{\"user\":\"mobile\",\"score\":\"12\",\"gap\":\"5\"}", new Instant()),
TimestampedValue.of("{\"user\":\"desktop\",\"score\":\"4\"}", new Instant()),
TimestampedValue.of("{\"user\":\"mobile\",\"score\":\"-3\",\"gap\":\"5\"}", new Instant().plus(2000)),
TimestampedValue.of("{\"user\":\"mobile\",\"score\":\"2\",\"gap\":\"5\"}", new Instant().plus(9000)),
TimestampedValue.of("{\"user\":\"mobile\",\"score\":\"7\",\"gap\":\"5\"}", new Instant().plus(12000)),
TimestampedValue.of("{\"user\":\"desktop\",\"score\":\"10\"}", new Instant().plus(12000)))
.withCoder(StringUtf8Coder.of()))
Visually:
For the desktop user there are only two events separated 12 seconds. No gap is specified so it will default to 10s and both scores will not be added up as they will belong to different sessions.
The other user, mobile, has 4 events separated 2, 7 and 3 seconds respectively. None of the time separations is greater than the default gap, so with standard sessions all events would belong to a single session with added score of 18:
user=desktop, score=4, window=[2019-05-26T13:28:49.122Z..2019-05-26T13:28:59.122Z)
user=mobile, score=18, window=[2019-05-26T13:28:48.582Z..2019-05-26T13:29:12.774Z)
user=desktop, score=10, window=[2019-05-26T13:29:03.367Z..2019-05-26T13:29:13.367Z)
With the new sessions we specify a “gap” attribute of 5 seconds to those events. The third message comes 7 seconds after the second one thus falling into a different session now. The previous large session with score 18 will be split into two 9-point sessions:
user=desktop, score=4, window=[2019-05-26T14:30:22.969Z..2019-05-26T14:30:32.969Z)
user=mobile, score=9, window=[2019-05-26T14:30:22.429Z..2019-05-26T14:30:30.553Z)
user=mobile, score=9, window=[2019-05-26T14:30:33.276Z..2019-05-26T14:30:41.849Z)
user=desktop, score=10, window=[2019-05-26T14:30:37.357Z..2019-05-26T14:30:47.357Z)
Full code here. Tested with Java SDK 2.13.0
Analogously, we can extend the same approach to the Python SDK. The code for the Sessions
class can be found here. We’ll define a new DynamicSessions
class. Inside the assign
method we can access the processed record using context.element
and modify the gap according to data.
Original:
def assign(self, context):
timestamp = context.timestamp
return [IntervalWindow(timestamp, timestamp + self.gap_size)]
Extended:
def assign(self, context):
timestamp = context.timestamp
try:
gap = Duration.of(context.element[1][“gap”])
except:
gap = self.gap_size
return [IntervalWindow(timestamp, timestamp + gap)]
If the input data contains a gap
field it will use it to override the default gap size. In our pipeline code we just need to window events into DynamicSessions
instead of the standard Sessions
:
'user_session_window' >> beam.WindowInto(DynamicSessions(gap_size=gap_size),
timestamp_combiner=window.TimestampCombiner.OUTPUT_AT_EOW)
With standard sessions the output is as follows:
INFO:root:>> User mobile had 4 events with total score 18 in a 0:00:22 session
INFO:root:>> User desktop had 1 events with total score 4 in a 0:00:10 session
INFO:root:>> User desktop had 1 events with total score 10 in a 0:00:10 session
With our custom windowing mobile events are split into two different sessions:
INFO:root:>> User mobile had 2 events with total score 9 in a 0:00:08 session
INFO:root:>> User mobile had 2 events with total score 9 in a 0:00:07 session
INFO:root:>> User desktop had 1 events with total score 4 in a 0:00:10 session
INFO:root:>> User desktop had 1 events with total score 10 in a 0:00:10 session
All files here. Tested with Python SDK 2.13.0
Upvotes: 1
Reputation: 1421
Yes, this is possible with the Beam paradigm if you use a custom WindowFn
. You can subclass the Sessions class and modify it to set gap durations differently based on the ID of each element. You can do this in assignWindows
, which looks like this in Sessions
:
@Override
public Collection<IntervalWindow> assignWindows(AssignContext c) {
// Assign each element into a window from its timestamp until gapDuration in the
// future. Overlapping windows (representing elements within gapDuration of
// each other) will be merged.
return Arrays.asList(new IntervalWindow(c.timestamp(), gapDuration));
}
The AssignContext
class can be used to access the element being assigned this window, which will allow you to retrieve the ID of that element.
It also sounds like you want elements with different IDs to be grouped in different windows (i.e. if element A and B come in within the gap duration but with different IDs, they should still be in different windows). This can be done by performing a GroupByKey
with the ID of your elements as keys. Session windows apply on a per-key basis as described in the Beam Programming Guide, so this will separate the elements by IDs.
Upvotes: 2