Reputation: 2119
I wrote some streaming pipeline (starts with Pub/Sub) and I want to add some windowing mechanism to it. I would like to test it in some proper manner now, so how to create some "dummy" stream?
My code:
pipeline_options = PipelineOptions()
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline_options.view_as(StandardOptions).streaming = True
p = TestPipeline(options=pipeline_options, runner=DirectRunner())
xmls_beam = beam.Create(xmls)
x = p | xmls_beam | beam.FlatMap(process_xmls) | beam.ParDo(FilterTI()) | beam.WindowInto(window.FixedWindows(200)) | beam.GroupByKey()
result = p.run()
result.wait_until_finish()
Upvotes: 0
Views: 1400
Reputation: 159
You can simulate a "dummy stream" using a PCollection of TimestampedValue.
For example, if your input is:
l = [window.TimestampedValue('a', 100), window.TimestampedValue('b', 300)]
pc = p | beam.Create(l) | ...
in your case (fixed window of width 200) you can expect in the output element 'a' falls into the first window and 'b' the second.
Upvotes: 1