totooooo
totooooo

Reputation: 1415

Apache Beam with Python : How to compute the minimum in a session window, and apply it to all related PCollections

I'm using Apache Beam's Python SDK to treat dictionaries, which represent streaming analytics hits. The hits are aggregated thanks to session windows. All my DataFlow really has to do is apply these session windows, and assign a session ID to all related hits.

As a session ID, I've figured out I would use the timestamp of first hit (combined with a cookie ID for each user). Here's my pipeline:

msgs = p | 'Read' >> beam.io.gcp.pubsub.ReadFromPubSub(
    topic='projects/{PROJECT_ID}/topics/{SOURCE_TOPIC}'.format(
        PROJECT_ID=PROJECT_ID, SOURCE_TOPIC=SOURCE_TOPIC),
    id_label='hit_id',
    timestamp_attribute='time')

hits = msgs | 'Parse' >> beam.Map(my_parser)

windowed_hits = hits | 'Windowing' >> beam.WindowInto(beam.window.Sessions(1 * 60))

visit_id = (windowed_hits | 'ExtractTimestamp' >> beam.Map(my_extracter)
    | 'GetMinimum' >> beam.CombineGlobally(my_min).without_defaults())

windowed_hits | 'SetVisitId' >>
    beam.Map(set_visit_id, visit_id=beam.pvalue.AsSingleton(visit_id))

my_parser is applying literal_eval to transform strings into dicts. my_extracter is taking timestamp out of the hit. set_visit_id is just taking an argument and assigning it to key visit_id.

This doesn't seem to work. When debugging, it seems my visit_id branch is correctly working, and it waits for the session to end before computing the minimum. But when used as a side input, I only get a pvalue.EmptySideInput. How can I get to the result I want, and why does my code return an empty side input ?

Edit: I've replaced AsSingleton with AsIter, to have an idea of what's going wrong here. What I get is a _FilteringIterable with:

So I guess the problem is this _target_window, but I do not understand why it ranges from TS + 60 to TS + 120. Could it be because of the timestamp of the WindowedValue ? It seems likely, as the boundaries of the _target_window seem derived from its rounded value.

Upvotes: 1

Views: 580

Answers (1)

totooooo
totooooo

Reputation: 1415

I eventually managed what I wanted to do by throwing away any Combine and replacing this by a GroupByKey.

def my_parser(msg):
    result = literal_eval(msg)
    return result

def set_key(hit):
    return (hit['cid'], hit)

def set_vid2(keyed_hits):
    k, hits = keyed_hits
    visit_id = min([h['time'] for h in hits])
    for h in hits:
        h['visit_id'] = visit_id
    return hits

def unpack_list(l):
    for d in l:
        yield d

msgs = p | 'Read' >> beam.io.gcp.pubsub.ReadFromPubSub(
    topic='projects/{PROJECT_ID}/topics/{SOURCE_TOPIC}'.format(
        PROJECT_ID=PROJECT_ID, SOURCE_TOPIC=SOURCE_TOPIC),
    id_label='hit_id',
    timestamp_attribute='time')

hits = msgs | 'Parse' >> beam.Map(my_parser)

keyed_hits = hits | 'SetKey' >> beam.Map(set_key)

windowed_hits = (keyed_hits | 'Windowing' >> beam.WindowInto(beam.window.Sessions(1 * 60))
    | 'Grouping' >> beam.GroupByKey())

clean_hits = windowed_hits | 'ComputeMin' >> beam.Map(set_vid2)

clean_hits | 'Unpack' >> beam.FlatMap(unpack_list)

After the GroupByKey, I have a PCollection containing lists of hits (which are grouped by cookie ID + session windows). Then once the visit ID is computed and set on every hit, I transform my PCollection of lists of hits into a PCollection of hits, with unpack_list.

I'm not sure this is the right way to do it, and if it has any impact on performance though.

Upvotes: 1

Related Questions