ronencozen
ronencozen

Reputation: 2201

How to automate a BigQuery SQL pipeline

I created a data pipeline using BigQuery SQL. It starts by importing a CSV file from Cloud Storage, then it is doing different analysis including Predictive modeling using BigQueryML Geography calculation using Geography functions, and KPIs computation using Analytic functions.

I am able to successfully run the different queries manually, and now i would like to automate the data pipeline.

My first choice was DataFlow SQL but it turns out that Dataflow SQL query syntax does not support geography functions.

DataFlow python is less of an option since the complete analysis is done in SQL and i would like to keep it this way.

My question is what are the other GCP options available to automate the data pipeline.

Upvotes: 2

Views: 2316

Answers (2)

rmesteves
rmesteves

Reputation: 4075

As I mentioned in a comment, if you need to orchestrate your queries you can use Cloud Composer, a fully managed Airflow cluster.

I created the code below to show you more or less how could you orchestrate your queries using this tool. Please notice that this is a basic code and can be improved in terms of coding standards. The code basically orchestrate 3 queries:

  1. The first one reads from a public table and write to another table in your project
  2. The second one reads the table created in your first query and select the 10000 newest rows based in a date column. After that, it saves the result to a table in your project.
  3. The third one reads the table created in the step 2 and calculate some aggregations. After that, it saves the results to another table in your project.

    import datetime
    from airflow import models
    from airflow.contrib.operators import bigquery_operator
    
    """The condiguration presented below will run your DAG every five minutes as specified in the 
    schedule_interval property starting from the datetime specified in the start_date property"""
    
    default_dag_args = {
        'start_date': datetime.datetime(2020, 4, 22, 15, 40), 
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': datetime.timedelta(minutes=1),
        'project_id': "<your_project_id>",
    }
    
    with models.DAG(
            'composer_airflow_bigquery_orchestration',
            schedule_interval = "*/5 * * * *",
            default_args=default_dag_args) as dag:
    
        run_first_query = bigquery_operator.BigQueryOperator(
            sql = "SELECT * FROM `bigquery-public-data.catalonian_mobile_coverage.mobile_data_2015_2017`",
            destination_dataset_table = "<your_project>.<your_dataset>.orchestration_1",
            task_id = 'xxxxxxxx',
            write_disposition = "WRITE_TRUNCATE",
            #create_disposition = "",
            allow_large_results = True,
            use_legacy_sql = False
        )
    
        run_second_query = bigquery_operator.BigQueryOperator(
            sql = "SELECT * FROM `<your_project>.orchestration_1` ORDER BY date LIMIT 10000 ",
            destination_dataset_table = "<your_project>.<your_dataset>.orchestration_2",
            task_id = 'yyyyyyyy',
            write_disposition = "WRITE_TRUNCATE",
            #create_disposition = "",
            allow_large_results = True,
            use_legacy_sql = False
        )
    
        run_third_query = bigquery_operator.BigQueryOperator(
            sql = "SELECT round(lat) r_lat, round(long) r_long, count(1) total FROM`<your_project>.orchestration_2` GROUP BY r_lat,r_long",
            destination_dataset_table = "<your_project>.<your_dataset>.orchestration_3",
            task_id = 'zzzzzzzz',
            write_disposition = "WRITE_TRUNCATE",
            #create_disposition = "",
            allow_large_results = True,
            use_legacy_sql = False
        )
    
    
       # Define DAG dependencies.
        run_first_query >> run_second_query >> run_third_query
    

Going step by step:

  • First, its imported some Airflow libraries like models and bigquery_operator

    from airflow import models
    from airflow.contrib.operators import bigquery_operator
    
  • Then its defined a dict named default_dag_args that will be used further when you create your DAG.

    default_dag_args = {
        'start_date': datetime.datetime(2020, 4, 22, 15, 40), 
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': datetime.timedelta(minutes=1),
        'project_id': "<your_project_id>",
    }
    
  • When you create your DAG, you pass the default_dag_args dict as the default argument and add the schedule interval argument that will define when your DAG should be run. You can use this argument with some preset expressions or using CRON expressions as you can see here

    with models.DAG(
            'composer_airflow_bigquery_orchestration',
            schedule_interval = "*/5 * * * *",
            default_args=default_dag_args) as dag:
    
  • After that, you can create your operator's instances. In this case we are using only the BigQueryOperator

        run_first_query = bigquery_operator.BigQueryOperator(
            sql = "SELECT * FROM `bigquery-public-data.catalonian_mobile_coverage.mobile_data_2015_2017`",
            destination_dataset_table = "<your_project>.<your_dataset>.orchestration_1",
            task_id = 'xxxxxxxx',
            write_disposition = "WRITE_TRUNCATE",
            #create_disposition = "",
            allow_large_results = True,
            use_legacy_sql = False
        )
    
        run_second_query = bigquery_operator.BigQueryOperator(
            sql = "SELECT * FROM `<your_project>.orchestration_1` ORDER BY date LIMIT 10000 ",
            destination_dataset_table = "<your_project>.<your_dataset>.orchestration_2",
            task_id = 'yyyyyyyy',
            write_disposition = "WRITE_TRUNCATE",
            #create_disposition = "",
            allow_large_results = True,
            use_legacy_sql = False
        )
    
        run_third_query = bigquery_operator.BigQueryOperator(
            sql = "SELECT round(lat) r_lat, round(long) r_long, count(1) total FROM`<your_project>.orchestration_2` GROUP BY r_lat,r_long",
            destination_dataset_table = "<your_project>.<your_dataset>.orchestration_3",
            task_id = 'zzzzzzzz',
            write_disposition = "WRITE_TRUNCATE",
            #create_disposition = "",
            allow_large_results = True,
            use_legacy_sql = False
        )
    
  • As a last step, we can define the dependencies for the DAG. This piece of code means that the run_second_query operation depends on the conclusion of the run_first_query and so it goes.

        run_first_query >> run_second_query >> run_third_query
    

Finally, I would like to add this article that discuss how to set the start_date and schedule_interval correctly when using CRON expressions.

Upvotes: 2

Chets
Chets

Reputation: 100

BigQuery has an inbuilt scheduling mechanism that's in beta features currently.

To automate a BQ native SQL pipeline, you can use this utility. Using CLI:

$ bq query \
--use_legacy_sql=false \
--destination_table=mydataset.mytable \
--display_name='My Scheduled Query' \
--replace=true \
'SELECT
1
FROM
mydataset.test'

Upvotes: 0

Related Questions