Reputation: 2201
I created a data pipeline using BigQuery SQL. It starts by importing a CSV file from Cloud Storage, then it is doing different analysis including Predictive modeling using BigQueryML Geography calculation using Geography functions, and KPIs computation using Analytic functions.
I am able to successfully run the different queries manually, and now i would like to automate the data pipeline.
My first choice was DataFlow SQL but it turns out that Dataflow SQL query syntax does not support geography functions.
DataFlow python is less of an option since the complete analysis is done in SQL and i would like to keep it this way.
My question is what are the other GCP options available to automate the data pipeline.
Upvotes: 2
Views: 2316
Reputation: 4075
As I mentioned in a comment, if you need to orchestrate your queries you can use Cloud Composer
, a fully managed Airflow
cluster.
I created the code below to show you more or less how could you orchestrate your queries using this tool. Please notice that this is a basic code and can be improved in terms of coding standards. The code basically orchestrate 3 queries:
The third one reads the table created in the step 2 and calculate some aggregations. After that, it saves the results to another table in your project.
import datetime
from airflow import models
from airflow.contrib.operators import bigquery_operator
"""The condiguration presented below will run your DAG every five minutes as specified in the
schedule_interval property starting from the datetime specified in the start_date property"""
default_dag_args = {
'start_date': datetime.datetime(2020, 4, 22, 15, 40),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=1),
'project_id': "<your_project_id>",
}
with models.DAG(
'composer_airflow_bigquery_orchestration',
schedule_interval = "*/5 * * * *",
default_args=default_dag_args) as dag:
run_first_query = bigquery_operator.BigQueryOperator(
sql = "SELECT * FROM `bigquery-public-data.catalonian_mobile_coverage.mobile_data_2015_2017`",
destination_dataset_table = "<your_project>.<your_dataset>.orchestration_1",
task_id = 'xxxxxxxx',
write_disposition = "WRITE_TRUNCATE",
#create_disposition = "",
allow_large_results = True,
use_legacy_sql = False
)
run_second_query = bigquery_operator.BigQueryOperator(
sql = "SELECT * FROM `<your_project>.orchestration_1` ORDER BY date LIMIT 10000 ",
destination_dataset_table = "<your_project>.<your_dataset>.orchestration_2",
task_id = 'yyyyyyyy',
write_disposition = "WRITE_TRUNCATE",
#create_disposition = "",
allow_large_results = True,
use_legacy_sql = False
)
run_third_query = bigquery_operator.BigQueryOperator(
sql = "SELECT round(lat) r_lat, round(long) r_long, count(1) total FROM`<your_project>.orchestration_2` GROUP BY r_lat,r_long",
destination_dataset_table = "<your_project>.<your_dataset>.orchestration_3",
task_id = 'zzzzzzzz',
write_disposition = "WRITE_TRUNCATE",
#create_disposition = "",
allow_large_results = True,
use_legacy_sql = False
)
# Define DAG dependencies.
run_first_query >> run_second_query >> run_third_query
Going step by step:
First, its imported some Airflow libraries like models and bigquery_operator
from airflow import models
from airflow.contrib.operators import bigquery_operator
Then its defined a dict named default_dag_args
that will be used further when you create your DAG.
default_dag_args = {
'start_date': datetime.datetime(2020, 4, 22, 15, 40),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=1),
'project_id': "<your_project_id>",
}
When you create your DAG, you pass the default_dag_args
dict as the default argument and add the schedule interval
argument that will define when your DAG should be run. You can use this argument with some preset expressions or using CRON expressions as you can see here
with models.DAG(
'composer_airflow_bigquery_orchestration',
schedule_interval = "*/5 * * * *",
default_args=default_dag_args) as dag:
After that, you can create your operator's instances. In this case we are using only the BigQueryOperator
run_first_query = bigquery_operator.BigQueryOperator(
sql = "SELECT * FROM `bigquery-public-data.catalonian_mobile_coverage.mobile_data_2015_2017`",
destination_dataset_table = "<your_project>.<your_dataset>.orchestration_1",
task_id = 'xxxxxxxx',
write_disposition = "WRITE_TRUNCATE",
#create_disposition = "",
allow_large_results = True,
use_legacy_sql = False
)
run_second_query = bigquery_operator.BigQueryOperator(
sql = "SELECT * FROM `<your_project>.orchestration_1` ORDER BY date LIMIT 10000 ",
destination_dataset_table = "<your_project>.<your_dataset>.orchestration_2",
task_id = 'yyyyyyyy',
write_disposition = "WRITE_TRUNCATE",
#create_disposition = "",
allow_large_results = True,
use_legacy_sql = False
)
run_third_query = bigquery_operator.BigQueryOperator(
sql = "SELECT round(lat) r_lat, round(long) r_long, count(1) total FROM`<your_project>.orchestration_2` GROUP BY r_lat,r_long",
destination_dataset_table = "<your_project>.<your_dataset>.orchestration_3",
task_id = 'zzzzzzzz',
write_disposition = "WRITE_TRUNCATE",
#create_disposition = "",
allow_large_results = True,
use_legacy_sql = False
)
As a last step, we can define the dependencies for the DAG. This piece of code means that the run_second_query operation depends on the conclusion of the run_first_query and so it goes.
run_first_query >> run_second_query >> run_third_query
Finally, I would like to add this article that discuss how to set the start_date and schedule_interval correctly when using CRON expressions.
Upvotes: 2
Reputation: 100
BigQuery has an inbuilt scheduling mechanism that's in beta features currently.
To automate a BQ native SQL pipeline, you can use this utility. Using CLI:
$ bq query \
--use_legacy_sql=false \
--destination_table=mydataset.mytable \
--display_name='My Scheduled Query' \
--replace=true \
'SELECT
1
FROM
mydataset.test'
Upvotes: 0