Raj Oberoi
Raj Oberoi

Reputation: 774

apache_beam.io.ReadFromBigQuery pass parameters dynamically in Dataflow pipeline templates

I am running a GCP Dataflow pipeline by deploying it as a template on GCP. I need to run a BigQuery read statement in the pipeline. The conditional parameter of which needs to be passed dynamically. How do I do so?

The query I want to run is something like

select * from tabel_name where field1=[dynamic_value]

Here is the sample code to run the query

    import apache_beam as beam
    query_string = "select * from tabel_name where field1=[dynamic_value]"

    options = {'project': PROJECT_ID, 'runner': 'Direct', "temp_location": "gs://my_bucket/temp",'region': "australia-southeast1", }
    pipeline_options = beam.pipeline.PipelineOptions(sys.argv,**options)
    custom_options = pipeline_options.view_as(MyOptions)

    with beam.Pipeline(options=pipeline_options) as pipeline:
        sample = (
                pipeline
                | 'QueryTable' >> beam.io.ReadFromBigQuery(query=query_string, use_standard_sql=False)
                | "Processing" >> beam.ParDo(MyPreprocessor())
                | beam.Map(print))

I need to pass the dynamic_value from a command line option --dynamic_value. I can pass it as a sys.argv parameter if it is not a template, however if I deploy a template it expects the dynamic_value as a PipelineOption. How do I dynamically create the query? Surprisingly the method beam.io.ReadFromBigQuery has a mechanism of taking a dataset, project_id and table as parameters but does not have any parameter where we can specify a filter for the data. Querying the entire table is un-necessary and expensive. Can someone provide a solution to the same.

Upvotes: 2

Views: 4336

Answers (1)

robertwb
robertwb

Reputation: 5104

You should look into using Flex Templates to support fully dynamic configuration of queries as a template.

Upvotes: 0

Related Questions