Vinay Karode
Vinay Karode

Reputation: 35

Providing 'query' parameter to 'beam.io.BigQuerySource' at runtime in dataflow python

TLDR: I would like to run beam.io.BigQuerySource with a different query every month using dataflow API and templates. If that is not possible then can I pass query to beam.io.BigQuerySource at runtime while still using Dataflow API and templates?

I have a dataflow 'batch' data pipeline which reads a BigQuery table like below

def run(argv=None):
    parser = argparse.ArgumentParser()

    parser.add_argument(
        '--pro_id',
        dest='pro_id',
        type=str,
        default='xxxxxxxxxx',
        help='project id')
    parser.add_argument(
        '--dataset',
        dest='dataset',
        type=str,
        default='xxxxxxxxxx',
        help='bigquery dataset to read data from')

    args, pipeline_args = parser.parse_known_args(argv)
    project_id = args.pro_id
    dataset_id = args.dataset

    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True
    with beam.Pipeline(argv=pipeline_args) as p:
    
        companies = (
                p
                | "Read from BigQuery" >> beam.io.Read(beam.io.BigQuerySource(query=query_bq(project_id, dataset_id),
                                                                              use_standard_sql=True))
        )

And the query parameter for beam.io.BigQuerySource is calculated by a function like this

from datetime import datetime
def query_bq(project, dataset):
    month = datetime.today().replace(day=1).strftime("%Y_%m_%d")
    query = (
        f'SELECT * FROM `{project}.{dataset}.data_{month}_json` '
        f'LIMIT 10'
    )
    return query

Couple of things to note here

  1. I want to run this data pipeline once a day
  2. The table id changes from month to month. So for example, the table id for this month would be data_2020_06_01_json and for next month the table id would be data_2020_07_01_json and all this is calculated by def query_bq(project, dataset) above
  3. I would like to automate the running of this batch pipeline using Dataflow API using cloud function, pubsub event, cloud scheduler.

Here is the cloud function that gets triggered by cloud-scheduler publishing an event to pubsub everyday

def run_dataflow(event, context):
    if 'data' in event:
        pubsub_message = base64.b64decode(event['data']).decode('utf-8')
        pubsub_message_dict = ast.literal_eval(pubsub_message)
        event = pubsub_message_dict.get("eventName")
        now = datetime.today().strftime("%Y-%m-%d-%H-%M-%S")
        project = 'xxx-xxx-xxx'
        region = 'europe-west2'
        dataflow = build('dataflow', 'v1b3', cache_discovery=False)
        if event == "run_dataflow":
            job = f'dataflow-{now}'
            template = 'gs://xxxxx/templates/xxxxx'
            request = dataflow.projects().locations().templates().launch(
                projectId=project,
                gcsPath=template,
                location=region,
                body={
                    'jobName': job,
                }
            )
            response = request.execute()
            print(response)

Here is the command I use to launch this data pipeline on dataflow

python main.py \
    --setup_file ./setup.py \
    --project xxx-xx-xxxx \
    --pro_id xxx-xx-xxxx \
    --dataset 'xx-xxx-xxx' \
    --machine_type=n1-standard-4 \
    --max_num_workers=5 \
    --num_workers=1 \
    --region europe-west2  \
    --serviceAccount= xxx-xxx-xxx \
    --runner DataflowRunner \
    --staging_location gs://xx/xx \
    --temp_location gs://xx/temp \
    --subnetwork="xxxxxxxxxx" \
    --template_location gs://xxxxx/templates/xxxxx

The problem I'm facing :

My query_bq function is called during compilation and creation of dataflow template that is then loaded to GCS. And this query_bq function does not get called during runtime. So whenever my cloud-function invokes dataflow create it is always reading from data_2020_06_01_json table and the table in the query will always remain same even when we progress into July, August and so on. What I really want is for that query to dynamically change based on query_bq function so in future I can read from data_2020_07_01_json and data_2020_08_01_json and so on.

I have also looked into the template file generated and it looks like the query is hard-coded into the template after compilation. Here's a snippet

 "name": "beamapp-xxxxx-0629014535-344920",
  "steps": [
    {
      "kind": "ParallelRead",
      "name": "s1",
      "properties": {
        "bigquery_export_format": "FORMAT_AVRO",
        "bigquery_flatten_results": true,
        "bigquery_query": "SELECT * FROM `xxxx.xxxx.data_2020_06_01_json` LIMIT 10",
        "bigquery_use_legacy_sql": false,
        "display_data": [
          {
            "key": "source",
            "label": "Read Source",
            "namespace": "apache_beam.runners.dataflow.ptransform_overrides.Read",
            "shortValue": "BigQuerySource",
            "type": "STRING",
            "value": "apache_beam.io.gcp.bigquery.BigQuerySource"
          },
          {
            "key": "query",
            "label": "Query",
            "namespace": "apache_beam.io.gcp.bigquery.BigQuerySource",
            "type": "STRING",
            "value": "SELECT * FROM `xxxx.xxxx.data_2020_06_01_json` LIMIT 10"
          },
          {
            "key": "validation",
            "label": "Validation Enabled",
            "namespace": "apache_beam.io.gcp.bigquery.BigQuerySource",
            "type": "BOOLEAN",
            "value": false
          }
        ],
        "format": "bigquery",
        "output_info": [
          {

An alternative I've tried

I also tried the ValueProvider as defined here https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#pipeline-io-and-runtime-parameters

and I added this to my code

class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument('--query_bq', type=str)

user_options = pipeline_options.view_as(UserOptions)
p | "Read from BigQuery" >> beam.io.Read(beam.io.BigQuerySource(query=user_options.query_bq,
                                                                              use_standard_sql=True))

And when I run this I get this error

WARNING:apache_beam.utils.retry:Retry with exponential backoff: waiting for 3.9023594566785924 seconds before retrying get_query_location because we caught exception: apitools.base.protorpclite.messages.ValidationError: Expected type <class 'str'> for field query, found SELECT * FROM `xxxx.xxxx.data_2020_06_01_json` LIMIT 10 (type <class 'apache_beam.options.value_provider.StaticValueProvider'>)

So I'm guessing beam.io.BigQuerySource does not accept ValueProviders

Upvotes: 2

Views: 2000

Answers (1)

Pablo
Pablo

Reputation: 11021

You cannot use ValueProviders in BigQuerySource, but as of the more recent versions of Beam, you can use beam.io.ReadFromBigQuery, which supports them well.

You would do:

result = (p 
          | beam.io.ReadFromBigQuery(query=options.input_query,
                                     ....))

You can pass value providers, and it has a lot of other utilities. Check out its documentation

Upvotes: 1

Related Questions