Coen Van Duijnhoven
Coen Van Duijnhoven

Reputation: 33

How to properly use the Dataflow / Apache beam wait_until_finish duration parameter?

I have a batch processing job running in dataflow on gcp under version apache-beam[gcp]==2.19.0 on the dataflow runner. I created a custom template for the job. The job is running as expected, but I also want to add a max job duration. I found the duration (in milliseconds) parameter inside the wait_until_finish() method, which should be available. Question is: How to let the templated batch job stop automatically when it runs longer than duration? I do not need to keep any data, I just want the job to stop when it runs too long. I've implemented the run function as follows:

def run():
    opts = PipelineOptions()
    user_options = opts.view_as(UserOptions)
    p = beam.Pipeline(options=opts)

    (p |
     "Read data" >> beam.io.Read(beam.io.BigQuerySource(query=user_options.query,
                                                        use_standard_sql=StaticValueProvider(bool, True))) |
     "Get data" >> beam.ParDo(doStuff()) |

     "Output data" >> beam.ParDo(outputData(param1=user_options.input1)) |

     "Write to BQ" >> beam.io.WriteToBigQuery(
                table=user_options.table_spec,
                schema=user_options.table_schema,
                write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
            )
     )

    result = p.run()

    result.wait_until_finish(duration=1800000)

Upvotes: 2

Views: 4924

Answers (1)

Riyadh Uddin
Riyadh Uddin

Reputation: 41

No, Dataflow does not provide auto cancellation after certain period. Still you can achieve your goal by simply putting cancel()

    result.wait_until_finish(duration=1800000)
    if not result.is_in_terminal_state():   # if pipeline isn't finished, cancel
      result.cancel()

Upvotes: 4

Related Questions