Pav3k
Pav3k

Reputation: 909

Use apache beam arguments within the pipeline

What is the best practice to get arguments from pipeline_options?

Dummy code example:

known_args, pipeline_args = parser.parse_known_args()
pipeline_options = PipelineOptions(pipeline_args)


with beam.Pipeline(options=pipeline_options) as pipeline:
    # here I want to use project argument
    # I can't do pipeline.options.project 
    # because warning is displayed
    (
      pipeline
      | "Transformation 1" >> beam.Map(lambda x: known_args.pubsub_sub)   # this is fine

      | "Transformation 2" >> beam.Map(lambda x: pipeline.options.project)    # this is not fine
    )

How to use those standard arguments needed by pipeline (project, region etc.), not those user-defined?

Upvotes: 2

Views: 2676

Answers (3)

Pav3k
Pav3k

Reputation: 909

Thanks for the answers.

So your answers + Beam docs give me a full picture of options management. To sum up:

  1. If we need to define some additional, custom arguments (PubSub topic or whatever) we build simple parser using argparse.
  2. Create known_args and beam_args objects from the parser.
  3. known_args should be use in our Pipeline code directly (for example known_args.pubsub_topic).
  4. beam_args are used to create PipelineOptions object which will be passed into Pipeline() object.
  5. [The most important for me] If we need to use explicitly some argument that is already defined by Apache Beam (project, streaming etc.) we should not overwrite it in our custom parser - we should rather create view_as() object and use it explicitly (like known_args). Two examples below.
  6. We want to use GCP project argument.
# "project" will go into beam_args because we didn't define it in our parser
known_args, beam_args = parser.parse_known_args()
pipeline_options = PipelineOptions(beam_args)

# this argument will be available in "gcp_args" because GoogleCloudOptions class
# defining "project" argument (you can check source code)
gcp_args = pipeline_options.view_as(GoogleCloudOptions)

# and next if we want to use it somewhere we should do:
gcp_args.project
  1. We want to use streaming argument.
# "streaming" will go into beam_args because we didn't define it in our parser
known_args, beam_args = parser.parse_known_args()
pipeline_options = PipelineOptions(beam_args)

# this argument will be available in "std_args" because StandardOptions class
# defining "streaming" argument (you can check source code)
std_args = pipeline_options.view_as(StandardOptions)

# and next if we want to use it somewhere we should do:
std_args.streaming

So in fact in order to see which arguments are already defined by Apache beam we should see Github source code

Upvotes: 2

robertwb
robertwb

Reputation: 5104

Rather than accessing options via the pipeline object, simply use pipeline_options directly.

Upvotes: 1

Mazlum Tosun
Mazlum Tosun

Reputation: 6582

I think the best practice is using Options as follow, I kept your intial code :

class MyPipelineOptions(PipelineOptions):

    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_argument("--project", help="Project", required=True)
        parser.add_argument("--pubsub_sub", help="Pub Sub", required=True)
        
my_pipeline_options = PipelineOptions().view_as(MyPipelineOptions)
pipeline_options = PipelineOptions()

with beam.Pipeline(options=pipeline_options) as pipeline:
    # here I want to use project argument
    # I can't do pipeline.options.project 
    # because warning is displayed
    (
      pipeline
      | "Transformation 1" >> beam.Map(lambda x: my_pipeline_options.pubsub_sub)   

      | "Transformation 2" >> beam.Map(lambda x: my_pipeline_options.project)    
    )

I think for predifined options like project, you have to add them in the MyPipelineOptions class to use it in your Python code.

Upvotes: 3

Related Questions