Reputation: 8802
I have a pipeline that takes a bounded PCollection, assigns timestamps to it and "windows" it into Sliding Windows. After a grouping transform, I want to assign the resulting PCollection back to the global window. I have not been able to figure out how to do this. See sample beam pseudo-code below:
import apache_beam as beam
with beam.Pipeline() as p:
(
p
| beam.io.ReadFromText()
| beam.ParDo(AddTimestampDoFn())
| beam.WindowInto(beam.window.SlidingWindows(60, 60))
| beam.GroupByKey()
| beam.ParDo(SomethingElse()
| beam.WindowInto(GlobalWindow()) # Here is where I want to bring back to global window
)
Any ideas on how to go about it?
Upvotes: 2
Views: 901
Reputation: 7058
Using beam.WindowInto(window.GlobalWindows())
should work. For example, with this quick test:
data = [{'message': 'Hi', 'timestamp': time.time()}]
events = (p
| 'Create Events' >> beam.Create(data) \
| 'Add Timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue(x, x['timestamp'])) \
| 'Sliding Windows' >> beam.WindowInto(beam.window.SlidingWindows(60, 60)) \
| 'First window' >> beam.ParDo(DebugPrinterFn()) \
| 'global Window' >> beam.WindowInto(window.GlobalWindows()) \
| 'Second window' >> beam.ParDo(DebugPrinterFn()))
where DebugPrinterFn
prints window information:
class DebugPrinterFn(beam.DoFn):
"""Just prints the element and window"""
def process(self, element, window=beam.DoFn.WindowParam):
logging.info("Received message %s in window=%s", element['message'], window)
yield element
I get the following output:
INFO:root:Received message Hi in window=[1575565500.0, 1575565560.0)
INFO:root:Received message Hi in window=GlobalWindow
Tested with the DirectRunner
and 2.16.0 SDK. If it does not work for you:
Full code here
Upvotes: 3