oyugi.collins
oyugi.collins

Reputation: 77

How best to avoid concurrent update on a table when running Bigquery scripts in an apache beam pipeline?

Hello I am trying to learn and explore Apache beam. Below is my Apache beam script that runs scripts that depend on a sequence of updates. I am getting the error below from bigquery

Could not serialize access to table dwingestion:api_staging.apache_beam_bms_latest_data due to concurrent update

I have tried different ways including extending the window and also separating the updates into different pipelines,but still the error persist .Is there a way I can make the script more stable to run the updates sequentially without updating the table concurrently ?

from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from google.cloud import bigquery
import os

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "dwingestion-b033d9535e9d.json"

# GCP project and table configurations
PROJECT_ID = 'dwingestion'
DATASET_ID = 'api_staging'

STAGING_TABLE = f'{PROJECT_ID}.{DATASET_ID}.apache_beam_bms_data_streaming'
LIVE_TABLE = f'{PROJECT_ID}.{DATASET_ID}.apache_beam_bms_latest_data'

# SQL query to fetch data from the staging table
STAGING_QUERY = f"""
SELECT *
FROM `{STAGING_TABLE}`
"""


class MergeStagingToLiveTable(beam.DoFn):
    """
    Perform the MERGE operation from the staging table to the live table.
    """

    def __init__(self, project_id, staging_table, live_table):
        self.project_id = project_id
        self.staging_table = staging_table
        self.live_table = live_table

    def setup(self):
        self.bq_client = bigquery.Client(project=self.project_id)

    def process(self, element):
        # Perform the MERGE query
        merge_query = """
        MERGE `dwingestion.api_staging.apache_beam_bms_latest_data` AS live USING (
          SELECT 
           #...the rest of the query """

        query_job = self.bq_client.query(merge_query)
        query_job.result()
        yield "MERGE query completed."


class CreateOrReplaceLiveTable(beam.DoFn):
    """
    Perform the CREATE OR REPLACE operation on the live table.
    """

    def __init__(self, project_id, live_table, final_table):
        self.project_id = project_id
        self.live_table = live_table
        self.final_table = final_table

    def setup(self):
        self.bq_client = bigquery.Client(project=self.project_id)

    def process(self, element):
        # Perform the CREATE OR REPLACE query
        create_replace_query = """
        CREATE OR REPLACE TABLE `dwingestion.api.apache_beam_bms_live` AS
        WITH with_country AS (
            SELECT 
              * ,
              'UG' AS country 
            FROM 
              `dwingestion.api_staging.apache_beam_bms_latest_data`
            ), 
        # ...the rest of the query
        """
        query_job = self.bq_client.query(create_replace_query)
        query_job.result()


def run_pipeline():
    """
    Main function to set up and execute the pipeline.
    """
    pipeline_options = PipelineOptions(
        project='dwingestion',
        runner='DataflowRunner',
        streaming=True,
        temp_location='gs://bodawork_dataflow_template/temp',
        staging_location='gs://bodawork_dataflow_template/staging',
        region='europe-west1',
        job_name='flesp-upsert-streaming-pipeline-dataflow',
        save_main_session=True
    )
    pipeline_options.view_as(StandardOptions).streaming = True

    # Apache Beam pipeline
    with beam.Pipeline(options=pipeline_options) as pipeline:
        (
                pipeline
                | 'Read from Staging Table' >> beam.io.ReadFromBigQuery(query=STAGING_QUERY,
                                                                        use_standard_sql=True,
                                                                        method="DIRECT_READ")
                # Step 1: Windowing before the first ParDo
                | 'Window for Merge' >> beam.WindowInto(beam.window.FixedWindows(60))  # 60-second window
                | 'Trigger Merge' >> beam.ParDo(MergeStagingToLiveTable(
            project_id=PROJECT_ID,
            staging_table=STAGING_TABLE,
            live_table=LIVE_TABLE
        ))

        )
    with beam.Pipeline() as pipeline_2:
        (
                pipeline_2

                | 'Window for Create/Replace' >> beam.WindowInto(beam.window.FixedWindows(300))
                | 'Trigger Create/Replace' >> beam.ParDo(CreateOrReplaceLiveTable(
            project_id=PROJECT_ID,
            live_table=LIVE_TABLE,
            final_table='dwingestion.api.apache_beam_bms_live'
        ))
        )


if __name__ == "__main__":
    run_pipeline()

Upvotes: 0

Views: 32

Answers (0)

Related Questions