sacherus
sacherus

Reputation: 2119

How to (unit) test a stream pipeline in apache-beam in python?

I wrote some streaming pipeline (starts with Pub/Sub) and I want to add some windowing mechanism to it. I would like to test it in some proper manner now, so how to create some "dummy" stream?

My code:

pipeline_options = PipelineOptions()
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline_options.view_as(StandardOptions).streaming = True
p = TestPipeline(options=pipeline_options, runner=DirectRunner())
xmls_beam = beam.Create(xmls)
x = p | xmls_beam | beam.FlatMap(process_xmls) | beam.ParDo(FilterTI()) | beam.WindowInto(window.FixedWindows(200)) | beam.GroupByKey()
result = p.run()
result.wait_until_finish()

Upvotes: 0

Views: 1400

Answers (1)

Yueyang Qiu
Yueyang Qiu

Reputation: 159

You can simulate a "dummy stream" using a PCollection of TimestampedValue.

For example, if your input is:

    l = [window.TimestampedValue('a', 100), window.TimestampedValue('b', 300)]
    pc = p | beam.Create(l) | ...

in your case (fixed window of width 200) you can expect in the output element 'a' falls into the first window and 'b' the second.

Upvotes: 1

Related Questions