Reputation: 325
I'm creating sliding time windows 20 seconds long every 5 seconds from batched log data:
rows = p | 'read events' >> beam.io.Read(beam.io.BigQuerySource(query=query))
# set timestamp field used for windowing and set 20 second long window every 5 seconds
ts_rows = (rows | 'set timestamp' >> beam.ParDo(AddTimestampDoFn())
| 'set window' >> beam.WindowInto(window.SlidingWindows(20,5)))
# extract only user id and relevant data, group and process
rows_with_data = (ts_rows | 'extract data' >> beam.FlatMap(lambda row:
[(str(row['user_id']),[row['data1'], row['data2'],row['data3']])])
| 'group by user id' >> beam.GroupByKey()
| 'Process window' >> beam.ParDo(WindowDataProcessingDoFn()))
How can I access the timestamp information for each window in Python? (An answer for Java is here but I don't know how to translate it into Python: How to get the max timestamp of the current sliding window) Ideally I'd want the end time of each window rather than the max or min timestamp of the data within the window.
Upvotes: 5
Views: 4600
Reputation: 1514
I went to the link you provided.
Note: window=beam.DoFn.WindowParam
is the parameter which is mentioned on the page you linked.
The window end time is beam.DoFn.WindowParam.end
. In Python, you can access it in like this:
Define your DoFn:
class BuildRecordFn(beam.DoFn):
def __init__(self):
super(BuildAdsRecordFn, self).__init__()
def process(self, element, window=beam.DoFn.WindowParam):
#window_start = window.start.to_utc_datetime()
window_end = window.end.to_utc_datetime()
return [element + (window_end,)]
Then use it like this:
lines = p | ReadFromText(known_args.input)
counts = (
lines
| 'ParseEventFn' >> beam.ParDo(ParseEventFn())
| 'AddEventTimestamp' >> beam.Map(
lambda elem: beam.window.TimestampedValue(elem, elem['timestamp']))
| 'ExtractObjectID' >> beam.Map(lambda elem: (elem["objectID"]))
| 'FixedWindow' >> beam.WindowInto(
beam.window.FixedWindows(60*1))
| 'PairWithOne' >> beam.Map(lambda x: (x, 1))
| 'GroupAndSum' >> beam.CombinePerKey(sum)
| 'AddWindowEndTimestamp'(beam.ParDo(BuildRecordFn()))
| 'Format' >> beam.Map(format_result)
| WriteToText(known_args.output)
def format_result(xs):
ys = [str(x) for x in xs]
return ','.join(ys)
Upvotes: 6