Reputation: 160
We have a Dataflow pipeline which begins with extracting data from BigQuery and the data are then written to CSV in a Google Bucket using apache_beam.io
's WriteToText
function. Because the files are sharded we need to run a piece of code to merge the files together using storage_client.compose()
. However, since we don't know when WriteToText
completes its data export we're using result.wait_until_finished() to wait for the export to complete and then continue with merging code.
On my local machine the flow of code runs as expected: The query gets called, then followed by file exports and then result.wait_until_finished()
returns DONE and storage_client.compose()
gets called.
The code runs in a flexible image. When running in Google Dataflow (ie. --runner=DataflowRunner
) then result.wait_until_finished()
returns UNKNOWN and it goes immediately to storage_client.compose()
without the previous jobs ever having completed. The compose()
method raises an exception since no files exist in the bucket.
wait_until_finished()
return UNKNOWN and what does it actually mean?wait_until_finished()
not wait for the previous steps to complete?wait_until_finished()
then how can we still merge the files in Dataflow?Snippet of the code
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import argparse
from apache_beam.io import WriteToText
from google.cloud import storage
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument("--extraction_query", help="A Big Query script to extract data")
#other arguments removed
known_args, beam_args = parser.parse_known_args()
beam_args.append('--setup_file')
beam_args.append('./setup.py')
# Pipeline
beam_options = PipelineOptions(beam_args, save_main_session=True)
p = beam.Pipeline(options=beam_options)
data_pipeline = (p
| 'Execute extraction query' >> beam.io.Read(beam.io.BigQuerySource(query=known_args.extraction_query, use_standard_sql=True))
)
if (known_args.bigquery_export_bucket is not None):
#variable preparations removed...
(data_pipeline
| 'Convert BigQuery data set to CSV' >> beam.Map(csv.convertDictToCSV, column_names)
| 'Write data set to CSV' >> WriteToText(data_set_output, num_shards=31))
(p
| 'Create data set headers list' >> beam.Create([column_names])
| 'Convert data set headers to CSV' >> beam.Map(csv.generateCSVHeader)
| 'Write data set headers' >> WriteToText(data_set_headers_output))
result = p.run()
job_status = result.wait_until_finish(duration=7200000)
logging.info(f'The pipeline completed with status {job_status}.')
if (job_status == 'DONE' or job_status == 'UNKNOWN'):
storage_client = storage.Client()
export_bucket = storage_client.get_bucket({bucketId})
export_blobs = list(storage_client.list_blobs(known_args.bigquery_export_bucket, prefix=known_args.bigquery_export_bucket_folder))
blobs_to_compose = []
for export_blob in export_blobs:
blobs_to_compose.append(export_blob)
My question might resemble this one although the question doesn't seem to have been answered.
Upvotes: 1
Views: 1515
Reputation: 63
wait_until_finish() isn't supported by flex templates in google cloud dataflow.
Upvotes: 2
Reputation: 6033
There are actually a few variants of "unknown" job states:
UNKNOWN
state.DRAINING
state that older SDKs do not know how to interpret.In all cases, UNKNOWN
should not be considered a terminal state, so wait_until_finish
should keep waiting, unless the service separately indicates that the job is complete. I've confirmed this in the code which has been that way for a number of years, so if there is a problem it lies elsewhere.
The full documentation for job states is at https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#jobstate
Upvotes: 1