Reputation: 33
I have a batch processing job running in dataflow on gcp under version apache-beam[gcp]==2.19.0 on the dataflow runner. I created a custom template for the job. The job is running as expected, but I also want to add a max job duration. I found the duration (in milliseconds) parameter inside the wait_until_finish() method, which should be available. Question is: How to let the templated batch job stop automatically when it runs longer than duration? I do not need to keep any data, I just want the job to stop when it runs too long. I've implemented the run function as follows:
def run():
opts = PipelineOptions()
user_options = opts.view_as(UserOptions)
p = beam.Pipeline(options=opts)
(p |
"Read data" >> beam.io.Read(beam.io.BigQuerySource(query=user_options.query,
use_standard_sql=StaticValueProvider(bool, True))) |
"Get data" >> beam.ParDo(doStuff()) |
"Output data" >> beam.ParDo(outputData(param1=user_options.input1)) |
"Write to BQ" >> beam.io.WriteToBigQuery(
table=user_options.table_spec,
schema=user_options.table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
)
result = p.run()
result.wait_until_finish(duration=1800000)
Upvotes: 2
Views: 4924
Reputation: 41
No, Dataflow does not provide auto cancellation after certain period. Still you can achieve your goal by simply putting cancel()
result.wait_until_finish(duration=1800000)
if not result.is_in_terminal_state(): # if pipeline isn't finished, cancel
result.cancel()
Upvotes: 4