Mike Keyes
Mike Keyes

Reputation: 325

How to get the end of window timestamp in Apache Beam Python

I'm creating sliding time windows 20 seconds long every 5 seconds from batched log data:

    rows = p | 'read events' >> beam.io.Read(beam.io.BigQuerySource(query=query))

    # set timestamp field used for windowing and set 20 second long window every 5 seconds
    ts_rows = (rows | 'set timestamp' >> beam.ParDo(AddTimestampDoFn())
                    | 'set window' >> beam.WindowInto(window.SlidingWindows(20,5)))

    # extract only user id and relevant data, group and process
    rows_with_data = (ts_rows | 'extract data' >> beam.FlatMap(lambda row: 
                                [(str(row['user_id']),[row['data1'], row['data2'],row['data3']])])
                              | 'group by user id' >> beam.GroupByKey()
                              | 'Process window' >> beam.ParDo(WindowDataProcessingDoFn()))

How can I access the timestamp information for each window in Python? (An answer for Java is here but I don't know how to translate it into Python: How to get the max timestamp of the current sliding window) Ideally I'd want the end time of each window rather than the max or min timestamp of the data within the window.

Upvotes: 5

Views: 4600

Answers (1)

x97Core
x97Core

Reputation: 1514

I went to the link you provided.

Note: window=beam.DoFn.WindowParam is the parameter which is mentioned on the page you linked.

The window end time is beam.DoFn.WindowParam.end. In Python, you can access it in like this:

Define your DoFn:

class BuildRecordFn(beam.DoFn):
def __init__(self):
    super(BuildAdsRecordFn, self).__init__()

def process(self, element,  window=beam.DoFn.WindowParam):
    #window_start = window.start.to_utc_datetime()
    window_end = window.end.to_utc_datetime()
    return [element + (window_end,)]

Then use it like this:

    lines = p | ReadFromText(known_args.input)
    counts = (
        lines
        | 'ParseEventFn' >> beam.ParDo(ParseEventFn())

        | 'AddEventTimestamp' >> beam.Map(
            lambda elem: beam.window.TimestampedValue(elem, elem['timestamp']))

        | 'ExtractObjectID' >> beam.Map(lambda elem: (elem["objectID"]))

        | 'FixedWindow' >> beam.WindowInto(
            beam.window.FixedWindows(60*1))

        | 'PairWithOne' >> beam.Map(lambda x: (x, 1))

        | 'GroupAndSum' >> beam.CombinePerKey(sum)

        | 'AddWindowEndTimestamp'(beam.ParDo(BuildRecordFn()))

        | 'Format' >> beam.Map(format_result)

        | WriteToText(known_args.output) 


    def format_result(xs):
        ys = [str(x) for x in xs]
        return ','.join(ys)

Upvotes: 6

Related Questions