Reputation: 1300
Imagine a simple Google Dataflow Pipeline. In this pipeline you read from BQ using apache beam function and depending of the returned pcollection you have to update those rows
Journeys = (p
| 'Read from BQ' >> beam.io.Read(
beam.io.BigQuerySource(query=query, dataset="dataset", use_standard_sql=True)))
Update = ( Journeys
| 'Updating Journey Table' >> beam.Map(UpdateBQ))
Write = (Journeys
| 'Write transform to BigQuery' >> WriteToBigQuery('table', TABLE_SCHEMA_CANONICAL))
The problem of this pipeline is that UpdateBQ is executed for each item in the returned pcollection when you read the table (beam.Map)
Which could be the better way to perform an update into a BigQuery table?
I suppose this could be done without using beam.Map and execute only and update which process all input pcolletion at once.
Extra
def UpdateBQ(input):
from google.cloud import bigquery
import uuid
import time
client = bigquery.Client()
STD = "#standardSQL"
QUERY = STD + "\n" + """UPDATE table SET Field= 'YYY' WHERE Field2='XXX'"""
client.use_legacy_sql = False
query_job = client.run_async_query(query=QUERY, job_name='temp-query-job_{}'.format(uuid.uuid4())) # API request
query_job.begin()
<...>
Possible solution
with beam.Pipeline(options=options) as p:
Journeys = (p
| 'Read from BQ' >> beam.io.Read(
beam.io.BigQuerySource(query=query, dataset="dataset", use_standard_sql=True))
)
Write = (Journeys
| 'Write transform to BigQuery' >> WriteToBigQuery('table', TABLE_SCHEMA_CANONICAL))
UpdateBQ();
Upvotes: 1
Views: 3264
Reputation: 1525
Are you doing any further transformation using the beam pipeline after reading from BQ? Or is it just the way you showed in the code i.e. read from BQ and then fire update command in BQ? In that case, you don't need beam at all. Just use BQ query to update data in a table using another table. BQ best practices suggest to avoid single row insert/update at a time.
Upvotes: 2