Reputation: 909
What is the best practice to get arguments from pipeline_options
?
Dummy code example:
known_args, pipeline_args = parser.parse_known_args()
pipeline_options = PipelineOptions(pipeline_args)
with beam.Pipeline(options=pipeline_options) as pipeline:
# here I want to use project argument
# I can't do pipeline.options.project
# because warning is displayed
(
pipeline
| "Transformation 1" >> beam.Map(lambda x: known_args.pubsub_sub) # this is fine
| "Transformation 2" >> beam.Map(lambda x: pipeline.options.project) # this is not fine
)
How to use those standard arguments needed by pipeline (project, region etc.), not those user-defined?
Upvotes: 2
Views: 2676
Reputation: 909
Thanks for the answers.
So your answers + Beam docs give me a full picture of options management. To sum up:
argparse
.known_args
and beam_args
objects from the parser.known_args
should be use in our Pipeline code directly (for example known_args.pubsub_topic
).beam_args
are used to create PipelineOptions
object which will be passed into Pipeline()
object.project
, streaming
etc.) we should not overwrite it in our custom parser - we should rather create view_as()
object and use it explicitly (like known_args
). Two examples below.project
argument.# "project" will go into beam_args because we didn't define it in our parser
known_args, beam_args = parser.parse_known_args()
pipeline_options = PipelineOptions(beam_args)
# this argument will be available in "gcp_args" because GoogleCloudOptions class
# defining "project" argument (you can check source code)
gcp_args = pipeline_options.view_as(GoogleCloudOptions)
# and next if we want to use it somewhere we should do:
gcp_args.project
streaming
argument.# "streaming" will go into beam_args because we didn't define it in our parser
known_args, beam_args = parser.parse_known_args()
pipeline_options = PipelineOptions(beam_args)
# this argument will be available in "std_args" because StandardOptions class
# defining "streaming" argument (you can check source code)
std_args = pipeline_options.view_as(StandardOptions)
# and next if we want to use it somewhere we should do:
std_args.streaming
So in fact in order to see which arguments are already defined by Apache beam we should see Github source code
Upvotes: 2
Reputation: 5104
Rather than accessing options
via the pipeline object, simply use pipeline_options
directly.
Upvotes: 1
Reputation: 6582
I think the best practice is using Options as follow, I kept your intial code :
class MyPipelineOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument("--project", help="Project", required=True)
parser.add_argument("--pubsub_sub", help="Pub Sub", required=True)
my_pipeline_options = PipelineOptions().view_as(MyPipelineOptions)
pipeline_options = PipelineOptions()
with beam.Pipeline(options=pipeline_options) as pipeline:
# here I want to use project argument
# I can't do pipeline.options.project
# because warning is displayed
(
pipeline
| "Transformation 1" >> beam.Map(lambda x: my_pipeline_options.pubsub_sub)
| "Transformation 2" >> beam.Map(lambda x: my_pipeline_options.project)
)
I think for predifined options like project
, you have to add them in the MyPipelineOptions
class to use it in your Python code.
Upvotes: 3