Jadiel de Armas
Jadiel de Armas

Reputation: 8802

Assign PCollection back to global window

I have a pipeline that takes a bounded PCollection, assigns timestamps to it and "windows" it into Sliding Windows. After a grouping transform, I want to assign the resulting PCollection back to the global window. I have not been able to figure out how to do this. See sample beam pseudo-code below:

import apache_beam as beam

with beam.Pipeline() as p:
    (
        p
        | beam.io.ReadFromText()
        | beam.ParDo(AddTimestampDoFn())
        | beam.WindowInto(beam.window.SlidingWindows(60, 60))
        | beam.GroupByKey()
        | beam.ParDo(SomethingElse()
        | beam.WindowInto(GlobalWindow()) # Here is where I want to bring back to global window
    )

Any ideas on how to go about it?

Upvotes: 2

Views: 901

Answers (1)

Guillem Xercavins
Guillem Xercavins

Reputation: 7058

Using beam.WindowInto(window.GlobalWindows()) should work. For example, with this quick test:

data = [{'message': 'Hi', 'timestamp': time.time()}]

events = (p
  | 'Create Events' >> beam.Create(data) \
  | 'Add Timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue(x, x['timestamp'])) \
  | 'Sliding Windows'   >> beam.WindowInto(beam.window.SlidingWindows(60, 60)) \
  | 'First window' >> beam.ParDo(DebugPrinterFn()) \
  | 'global Window'   >> beam.WindowInto(window.GlobalWindows()) \
  | 'Second window'   >> beam.ParDo(DebugPrinterFn()))

where DebugPrinterFn prints window information:

class DebugPrinterFn(beam.DoFn):
  """Just prints the element and window"""
  def process(self, element, window=beam.DoFn.WindowParam):
    logging.info("Received message %s in window=%s", element['message'], window)
    yield element

I get the following output:

INFO:root:Received message Hi in window=[1575565500.0, 1575565560.0)
INFO:root:Received message Hi in window=GlobalWindow

Tested with the DirectRunner and 2.16.0 SDK. If it does not work for you:

  • Do you get any error?
  • Which runner and SDK are you using?

Full code here

Upvotes: 3

Related Questions