Reputation: 323
I have been running Dataflow jobs based on a template created back in December that passes some arguments at runtime, without any issues. I have had to make some changes to the template now and I seem to be having issues generating a working template, even when using the same code/versions of beam as before. My jobs just hang indefinitely - tried leaving one and it timed out after an hour or so.
There's certainly an issue as even my first step which is just creating an empty PCollection doesn't succeed, it just says running.
I have abstracted the hell out of the function to work out what the issue might be, since there are no errors or oddities in the logs. Sharing below the very slimmed down pipeline, as soon as I comment out the 2nd and 3rd lines in the pipeline which use the value provider arguments the job succeeds (at creating an empty PCollection).
My use of the 'add_value_provider_argument' follows pretty closely the official snippet here: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets.py#L554 and https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#using-valueprovider-in-your-functions
I borrowed it from Pablo here: https://stackoverflow.com/a/58327762/5687904
I even tried building a completely fresh environment in a new VM thinking that maybe my environment has something corrupting the template without failing to build it.
I've tried Dataflow SDK 2.15.0 which is what the original template used as well as 2.24.0 (most recent one).
Would really appreciate any ideas around debugging this as I'm starting to despair.
import logging
import pandas as pd
import argparse
import datetime
#================ Apache beam ======================
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.io import fileio
import io
#======================
PROJECT_ID = 'my-project'
GCS_STAGING_LOCATION = 'gs://my-bucket//gcs_staging_location/'
GCS_TMP_LOCATION = 'gs://my-bucket/gcs_tmp_location/'
#======================================
# https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#valueprovider
class FileIterator(beam.DoFn):
def __init__(self, files_bucket):
self.files_bucket = files_bucket
def process(self, element):
files = pd.read_csv(str(element), header=None).values[0].tolist()
bucket = self.files_bucket.get()
files = [str(bucket) + '/' + file for file in files]
logging.info('Files list is: {}'.format(files))
return files
#=========================================================
# https://stackoverflow.com/questions/58240058/ways-of-using-value-provider-parameter-in-python-apache-beam
class OutputValueProviderFn(beam.DoFn):
def __init__(self, vp):
self.vp = vp
def process(self, unused_elm):
yield self.vp.get()
#=========================================================
class RuntimeOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--files_bucket',
help='Bucket where the raw files are',
type=str)
parser.add_value_provider_argument(
'--complete_batch',
help='Text file with filenames in it location',
type=str)
parser.add_value_provider_argument(
'--comp_table',
required=False,
help='BQ table to write to (dataset.table)',
type=str)
#=========================================================
def run():
#====================================
# TODO PUT AS PARAMETERS
#====================================
dt_now = datetime.datetime.now().strftime('%Y%m%d')
job_name = 'dataflow-test-{}'.format(dt_now)
pipeline_options_batch = PipelineOptions()
runtime_options = pipeline_options_batch.view_as(RuntimeOptions)
setup_options = pipeline_options_batch.view_as(SetupOptions)
setup_options.setup_file = './setup.py'
google_cloud_options = pipeline_options_batch.view_as(GoogleCloudOptions)
google_cloud_options.project = PROJECT_ID
google_cloud_options.staging_location = GCS_STAGING_LOCATION
google_cloud_options.temp_location = GCS_TMP_LOCATION
pipeline_options_batch.view_as(StandardOptions).runner = 'DataflowRunner'
pipeline_options_batch.view_as(WorkerOptions).autoscaling_algorithm = 'THROUGHPUT_BASED'
pipeline_options_batch.view_as(WorkerOptions).max_num_workers = 10
pipeline_options_batch.view_as(SetupOptions).save_main_session = True
pipeline_options_batch.view_as(DebugOptions).experiments = ['use_beam_bq_sink']
with beam.Pipeline(options=pipeline_options_batch) as pipeline_2:
try:
final_data = (
pipeline_2
|'Create empty PCollection' >> beam.Create([None])
|'Get accepted batch file'>> beam.ParDo(OutputValueProviderFn(runtime_options.complete_batch))
# |'Read all filenames into a list'>> beam.ParDo(FileIterator(runtime_options.files_bucket))
)
except Exception as exception:
logging.error(exception)
pass
#=========================================================
if __name__ == "__main__":
run()
Upvotes: 0
Views: 597
Reputation: 143
It seems that when you created the template, the Apache Beam SDK used was forward-compatible with the packages versions within the setup.py file and it was working okey; however, when you performed the update the SDK version may not be forward-compatible with the same listed versions in the setup.py.
Based on this documentation, the Apache Beam SDK and Dataflow workers must have forward-compatible libraries to avoid version collisions that can result in unexpected behavior in the service.
In order to know the required packages versions within each Apache Beam SDK version take a look at this page.
Upvotes: 1