Reputation: 691
I am writing a Dataflow job which reads from BigQuery and does a few transformations.
data = (
pipeline
| beam.io.ReadFromBigQuery(query='''
SELECT * FROM `bigquery-public-data.chicago_crime.crime` LIMIT 100
''', use_standard_sql=True)
| beam.Map(print)
)
But my requirement is to read from BigQuery only after receiving a notification from a PubSub Topic. The above DataFlow job should start reading data from BigQuery only if the below message is received. If it is a different job id or a different status, then no action should be done.
PubSub Message : {'job_id':101, 'status': 'Success'}
Any help on this part?
Upvotes: 0
Views: 658
Reputation: 1166
That is fairly easy, the code would look like this
pubsub_msg = (
pipeline
| beam.io.gcp.pubsub.ReadFromPubSub(topic=my_topic, subscription=my_subscription)
)
bigquery_data = (
pubsub_msg
| beam.Filter(lambda msg: msg['job_id']==101) # you might want to use a more sophisticated filter condition
| beam.io.ReadFromBigQuery(query='''
SELECT * FROM `bigquery-public-data.chicago_crime.crime` LIMIT 100
''', use_standard_sql=True)
)
bigquery_data | beam.Map(print)
However, if you do it like that you will have a streaming DataFlow job running (indefinitely, or until you cancel the job), since using ReadFromPubSub
results automatically in a streaming job. Consequently, this does not start a Dataflow job, when a message is arriving in PubSub, but rather one job is already running and listening to the topic for something to do.
If you actually want to trigger a Dataflow batch job, I would recommend using a Dataflow template, and starting this template with a Cloud Function which listens to your PubSub topic. The logic of the filtering would then be within this CloudFunction (as a simple if
condition).
Upvotes: 5
Reputation: 691
I ended up using Cloud Functions, added the filtering logic in it and starting the Dataflow from there. Found the below link useful. How to trigger a dataflow with a cloud function? (Python SDK)
Upvotes: 1