Reputation: 35
TLDR: I would like to run beam.io.BigQuerySource
with a different query every month using dataflow API and templates. If that is not possible then can I pass query to beam.io.BigQuerySource
at runtime while still using Dataflow API and templates?
I have a dataflow 'batch' data pipeline which reads a BigQuery table like below
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument(
'--pro_id',
dest='pro_id',
type=str,
default='xxxxxxxxxx',
help='project id')
parser.add_argument(
'--dataset',
dest='dataset',
type=str,
default='xxxxxxxxxx',
help='bigquery dataset to read data from')
args, pipeline_args = parser.parse_known_args(argv)
project_id = args.pro_id
dataset_id = args.dataset
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(argv=pipeline_args) as p:
companies = (
p
| "Read from BigQuery" >> beam.io.Read(beam.io.BigQuerySource(query=query_bq(project_id, dataset_id),
use_standard_sql=True))
)
And the query parameter for beam.io.BigQuerySource
is calculated by a function like this
from datetime import datetime
def query_bq(project, dataset):
month = datetime.today().replace(day=1).strftime("%Y_%m_%d")
query = (
f'SELECT * FROM `{project}.{dataset}.data_{month}_json` '
f'LIMIT 10'
)
return query
Couple of things to note here
data_2020_06_01_json
and for next month the table id would be data_2020_07_01_json
and all this is calculated by def query_bq(project, dataset)
aboveHere is the cloud function that gets triggered by cloud-scheduler publishing an event to pubsub everyday
def run_dataflow(event, context):
if 'data' in event:
pubsub_message = base64.b64decode(event['data']).decode('utf-8')
pubsub_message_dict = ast.literal_eval(pubsub_message)
event = pubsub_message_dict.get("eventName")
now = datetime.today().strftime("%Y-%m-%d-%H-%M-%S")
project = 'xxx-xxx-xxx'
region = 'europe-west2'
dataflow = build('dataflow', 'v1b3', cache_discovery=False)
if event == "run_dataflow":
job = f'dataflow-{now}'
template = 'gs://xxxxx/templates/xxxxx'
request = dataflow.projects().locations().templates().launch(
projectId=project,
gcsPath=template,
location=region,
body={
'jobName': job,
}
)
response = request.execute()
print(response)
Here is the command I use to launch this data pipeline on dataflow
python main.py \
--setup_file ./setup.py \
--project xxx-xx-xxxx \
--pro_id xxx-xx-xxxx \
--dataset 'xx-xxx-xxx' \
--machine_type=n1-standard-4 \
--max_num_workers=5 \
--num_workers=1 \
--region europe-west2 \
--serviceAccount= xxx-xxx-xxx \
--runner DataflowRunner \
--staging_location gs://xx/xx \
--temp_location gs://xx/temp \
--subnetwork="xxxxxxxxxx" \
--template_location gs://xxxxx/templates/xxxxx
My query_bq
function is called during compilation and creation of dataflow template that is then loaded to GCS. And this query_bq
function does not get called during runtime. So whenever my cloud-function invokes dataflow create it is always reading from data_2020_06_01_json
table and the table in the query will always remain same even when we progress into July, August and so on. What I really want is for that query to dynamically change based on query_bq
function so in future I can read from data_2020_07_01_json
and data_2020_08_01_json
and so on.
I have also looked into the template file generated and it looks like the query is hard-coded into the template after compilation. Here's a snippet
"name": "beamapp-xxxxx-0629014535-344920",
"steps": [
{
"kind": "ParallelRead",
"name": "s1",
"properties": {
"bigquery_export_format": "FORMAT_AVRO",
"bigquery_flatten_results": true,
"bigquery_query": "SELECT * FROM `xxxx.xxxx.data_2020_06_01_json` LIMIT 10",
"bigquery_use_legacy_sql": false,
"display_data": [
{
"key": "source",
"label": "Read Source",
"namespace": "apache_beam.runners.dataflow.ptransform_overrides.Read",
"shortValue": "BigQuerySource",
"type": "STRING",
"value": "apache_beam.io.gcp.bigquery.BigQuerySource"
},
{
"key": "query",
"label": "Query",
"namespace": "apache_beam.io.gcp.bigquery.BigQuerySource",
"type": "STRING",
"value": "SELECT * FROM `xxxx.xxxx.data_2020_06_01_json` LIMIT 10"
},
{
"key": "validation",
"label": "Validation Enabled",
"namespace": "apache_beam.io.gcp.bigquery.BigQuerySource",
"type": "BOOLEAN",
"value": false
}
],
"format": "bigquery",
"output_info": [
{
I also tried the ValueProvider
as defined here https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#pipeline-io-and-runtime-parameters
and I added this to my code
class UserOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--query_bq', type=str)
user_options = pipeline_options.view_as(UserOptions)
p | "Read from BigQuery" >> beam.io.Read(beam.io.BigQuerySource(query=user_options.query_bq,
use_standard_sql=True))
And when I run this I get this error
WARNING:apache_beam.utils.retry:Retry with exponential backoff: waiting for 3.9023594566785924 seconds before retrying get_query_location because we caught exception: apitools.base.protorpclite.messages.ValidationError: Expected type <class 'str'> for field query, found SELECT * FROM `xxxx.xxxx.data_2020_06_01_json` LIMIT 10 (type <class 'apache_beam.options.value_provider.StaticValueProvider'>)
So I'm guessing beam.io.BigQuerySource
does not accept ValueProviders
Upvotes: 2
Views: 2000
Reputation: 11021
You cannot use ValueProvider
s in BigQuerySource
, but as of the more recent versions of Beam, you can use beam.io.ReadFromBigQuery
, which supports them well.
You would do:
result = (p
| beam.io.ReadFromBigQuery(query=options.input_query,
....))
You can pass value providers, and it has a lot of other utilities. Check out its documentation
Upvotes: 1