Reputation: 321
So the aim of the pipeline is to be able to use Runtime Variables to be able to open a csv file and name a BigQuery table.
All I need is to be able to access these variables globally, or within a ParDo, such as parsing it into the function.
I have tried creating a dummy string, then running a FlatMap to access the Runtime Parameters and make them global, although it returns nothing.
eg.
class CustomPipelineOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--path',
type=str,
help='csv storage path')
parser.add_value_provider_argument(
'--table_name',
type=str,
help='Table Id')
def run()
def rewrite_values(element):
""" Rewrite default env values"""
# global project_id
# global campaign_id
# global organization_id
# global language
# global file_path
try:
logging.info("File Path with str(): {}".format(str(custom_options.path)))
logging.info("----------------------------")
logging.info("element: {}".format(element))
project_id = str(cloud_options.project)
file_path = custom_options.path.get()
table_name = custom_options.table_name.get()
logging.info("project: {}".format(project_id))
logging.info("File path: {}".format(file_path))
logging.info("language: {}".format(table_name))
logging.info("----------------------------")
except Exception as e:
logging.info("Error format----------------------------")
raise KeyError(e)
return file_path
pipeline_options = PipelineOptions()
cloud_options = pipeline_options.view_as(GoogleCloudOptions)
custom_options = pipeline_options.view_as(CustomPipelineOptions)
pipeline_options.view_as(SetupOptions).save_main_session = True
# Beginning of the pipeline
p = beam.Pipeline(options=pipeline_options)
init_data = (p
| beam.Create(["Start"])
| beam.FlatMap(rewrite_values))
pipeline processing, running pipeline etc.
I can access the project variable no problem, although everything else returns as blank.
If I make the custom_options variable global, or when I pass a specific customs object into a function, such as: | 'Read data' >> beam.ParDo(ReadGcsBlobs(path_file=custom_options.path))
, it only returns something such as RuntimeValueProvider(option: path, type: str, default_value: None)
.
If I use | 'Read data' >> beam.ParDo(ReadGcsBlobs(path_file=custom_options.path.get()))
, the variable is and empty string.
So in essence, I just need to access these variables globally, is it possible?
Finally to clarify, I do not want to use the ReadFromText
method, I can use the runtime variable there, although to incorporate the runtime options into the dict created from the csv file will be to costly as I am working with huge csv files.
Upvotes: 1
Views: 2680
Reputation: 7058
For me it worked by declaring cloud_options
and custom_options
as global
:
import argparse, logging
import apache_beam as beam
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
class CustomPipelineOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--path',
type=str,
help='csv storage path')
parser.add_value_provider_argument(
'--table_name',
type=str,
help='Table Id')
def rewrite_values(element):
""" Rewrite default env values"""
# global project_id
# global campaign_id
# global organization_id
# global language
# global file_path
try:
logging.info("File Path with str(): {}".format(str(custom_options.path.get())))
logging.info("----------------------------")
logging.info("element: {}".format(element))
project_id = str(cloud_options.project)
file_path = custom_options.path.get()
table_name = custom_options.table_name.get()
logging.info("project: {}".format(project_id))
logging.info("File path: {}".format(file_path))
logging.info("language: {}".format(table_name))
logging.info("----------------------------")
except Exception as e:
logging.info("Error format----------------------------")
raise KeyError(e)
return file_path
def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
global cloud_options
global custom_options
pipeline_options = PipelineOptions(pipeline_args)
cloud_options = pipeline_options.view_as(GoogleCloudOptions)
custom_options = pipeline_options.view_as(CustomPipelineOptions)
pipeline_options.view_as(SetupOptions).save_main_session = True
# Beginning of the pipeline
p = beam.Pipeline(options=pipeline_options)
init_data = (p
| beam.Create(["Start"])
| beam.FlatMap(rewrite_values))
result = p.run()
# result.wait_until_finish
if __name__ == '__main__':
run()
After setting the PROJECT
and BUCKET
variables I staged the template with:
python script.py \
--runner DataflowRunner \
--project $PROJECT \
--staging_location gs://$BUCKET/staging \
--temp_location gs://$BUCKET/temp \
--template_location gs://$BUCKET/templates/global_options
And execute it with providing path
and table_name
options:
gcloud dataflow jobs run global_options \
--gcs-location gs://$BUCKET/templates/global_options \
--parameters path=test_path,table_name=test_table
And the runtime parameters seem to be logged fine inside the FlatMap:
Upvotes: 3