Reputation: 77
Hello I am trying to learn and explore Apache beam. Below is my Apache beam script that runs scripts that depend on a sequence of updates. I am getting the error below from bigquery
Could not serialize access to table dwingestion:api_staging.apache_beam_bms_latest_data due to concurrent update
I have tried different ways including extending the window and also separating the updates into different pipelines,but still the error persist .Is there a way I can make the script more stable to run the updates sequentially without updating the table concurrently ?
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from google.cloud import bigquery
import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "dwingestion-b033d9535e9d.json"
# GCP project and table configurations
PROJECT_ID = 'dwingestion'
DATASET_ID = 'api_staging'
STAGING_TABLE = f'{PROJECT_ID}.{DATASET_ID}.apache_beam_bms_data_streaming'
LIVE_TABLE = f'{PROJECT_ID}.{DATASET_ID}.apache_beam_bms_latest_data'
# SQL query to fetch data from the staging table
STAGING_QUERY = f"""
SELECT *
FROM `{STAGING_TABLE}`
"""
class MergeStagingToLiveTable(beam.DoFn):
"""
Perform the MERGE operation from the staging table to the live table.
"""
def __init__(self, project_id, staging_table, live_table):
self.project_id = project_id
self.staging_table = staging_table
self.live_table = live_table
def setup(self):
self.bq_client = bigquery.Client(project=self.project_id)
def process(self, element):
# Perform the MERGE query
merge_query = """
MERGE `dwingestion.api_staging.apache_beam_bms_latest_data` AS live USING (
SELECT
#...the rest of the query """
query_job = self.bq_client.query(merge_query)
query_job.result()
yield "MERGE query completed."
class CreateOrReplaceLiveTable(beam.DoFn):
"""
Perform the CREATE OR REPLACE operation on the live table.
"""
def __init__(self, project_id, live_table, final_table):
self.project_id = project_id
self.live_table = live_table
self.final_table = final_table
def setup(self):
self.bq_client = bigquery.Client(project=self.project_id)
def process(self, element):
# Perform the CREATE OR REPLACE query
create_replace_query = """
CREATE OR REPLACE TABLE `dwingestion.api.apache_beam_bms_live` AS
WITH with_country AS (
SELECT
* ,
'UG' AS country
FROM
`dwingestion.api_staging.apache_beam_bms_latest_data`
),
# ...the rest of the query
"""
query_job = self.bq_client.query(create_replace_query)
query_job.result()
def run_pipeline():
"""
Main function to set up and execute the pipeline.
"""
pipeline_options = PipelineOptions(
project='dwingestion',
runner='DataflowRunner',
streaming=True,
temp_location='gs://bodawork_dataflow_template/temp',
staging_location='gs://bodawork_dataflow_template/staging',
region='europe-west1',
job_name='flesp-upsert-streaming-pipeline-dataflow',
save_main_session=True
)
pipeline_options.view_as(StandardOptions).streaming = True
# Apache Beam pipeline
with beam.Pipeline(options=pipeline_options) as pipeline:
(
pipeline
| 'Read from Staging Table' >> beam.io.ReadFromBigQuery(query=STAGING_QUERY,
use_standard_sql=True,
method="DIRECT_READ")
# Step 1: Windowing before the first ParDo
| 'Window for Merge' >> beam.WindowInto(beam.window.FixedWindows(60)) # 60-second window
| 'Trigger Merge' >> beam.ParDo(MergeStagingToLiveTable(
project_id=PROJECT_ID,
staging_table=STAGING_TABLE,
live_table=LIVE_TABLE
))
)
with beam.Pipeline() as pipeline_2:
(
pipeline_2
| 'Window for Create/Replace' >> beam.WindowInto(beam.window.FixedWindows(300))
| 'Trigger Create/Replace' >> beam.ParDo(CreateOrReplaceLiveTable(
project_id=PROJECT_ID,
live_table=LIVE_TABLE,
final_table='dwingestion.api.apache_beam_bms_live'
))
)
if __name__ == "__main__":
run_pipeline()
Upvotes: 0
Views: 32