MassyB
MassyB

Reputation: 1184

Airflow BigQueryOperator: how to save query result in a partitioned Table?

I have a simple DAG

from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator

with DAG(dag_id='my_dags.my_dag') as dag:

    start = DummyOperator(task_id='start')

    end = DummyOperator(task_id='end')
    sql = """
             SELECT *
             FROM 'another_dataset.another_table'
          """
    bq_query = BigQueryOperator(bql=sql,
                            destination_dataset_table='my_dataset.my_table20180524'),
                            task_id='bq_query',
                            bigquery_conn_id='my_bq_connection',
                            use_legacy_sql=False,
                            write_disposition='WRITE_TRUNCATE',
                            create_disposition='CREATE_IF_NEEDED',
                            query_params={})
    start >> bq_query >> end

When executing the bq_query task the SQL query gets saved in a sharded table. I want it to get saved in a daily partitioned table. In order to do so, I only changed destination_dataset_table to my_dataset.my_table$20180524. I got the error below when executing the bq_task:

Partitioning specification must be provided in order to create partitioned table

How can I specify to BigQuery to save query result to a daily partitioned table ? my first guess has been to use query_params in BigQueryOperator but I didn't find any example on how to use that parameter.

EDIT:

I'm using google-cloud==0.27.0 python client ... and it's the one used in Prod :(

Upvotes: 6

Views: 14309

Answers (4)

Abhijit
Abhijit

Reputation: 363

from datetime import datetime,timedelta
from airflow import DAG
from airflow.models import Variable
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.operators.dummy_operator import DummyOperator

DEFAULT_DAG_ARGS = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=10),
    'project_id': Variable.get('gcp_project'),
    'zone': Variable.get('gce_zone'),
    'region': Variable.get('gce_region'),
    'location': Variable.get('gce_zone'),
}

with DAG(
    'test',
    start_date=datetime(2019, 1, 1),
    schedule_interval=None,
    catchup=False,
    default_args=DEFAULT_DAG_ARGS) as dag:

    bq_query = BigQueryOperator(
        task_id='create-partition',
        bql="""SELECT
                * 
                FROM
                `dataset.table_name`""",   -- table from which you want to pull data
        destination_dataset_table='project.dataset.table_name' + '$' + datetime.now().strftime('%Y%m%d'),             -- Auto partitioned table in Bq 
        write_disposition='WRITE_TRUNCATE',
        create_disposition='CREATE_IF_NEEDED',
        use_legacy_sql=False,
    )

I recommend to use Variable in Airflow and create all fields and use in DAG. By above code, partition will be added in Bigquery table for Todays date.

Upvotes: 0

codninja0908
codninja0908

Reputation: 567

Using BigQueryOperator you can pass time_partitioning parameter which will create ingestion-time partitioned tables

bq_cmd = BigQueryOperator (
            task_id=                    "task_id",
            sql=                        [query],
            destination_dataset_table=  destination_tbl,
            use_legacy_sql=             False,
            write_disposition=          'WRITE_TRUNCATE',
            time_partitioning=          {'time_partitioning_type':'DAY'},
            allow_large_results=        True,
            trigger_rule=               'all_success',
            query_params=               query_params,
            dag=                        dag
        )

Upvotes: 0

gruby
gruby

Reputation: 990

You first need to create an Empty partitioned destination table. Follow instructions here: link to create an empty partitioned table

and then run below airflow pipeline again. You can try code:

import datetime
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
today_date = datetime.datetime.now().strftime("%Y%m%d")
table_name = 'my_dataset.my_table' + '$' + today_date
with DAG(dag_id='my_dags.my_dag') as dag:
    start = DummyOperator(task_id='start')
    end = DummyOperator(task_id='end')
    sql = """
         SELECT *
         FROM 'another_dataset.another_table'
          """
    bq_query = BigQueryOperator(bql=sql,
                        destination_dataset_table={{ params.t_name }}),
                        task_id='bq_query',
                        bigquery_conn_id='my_bq_connection',
                        use_legacy_sql=False,
                        write_disposition='WRITE_TRUNCATE',
                        create_disposition='CREATE_IF_NEEDED',
                        query_params={'t_name': table_name},
                        dag=dag
                        )
start >> bq_query >> end

So what I did is that I created a dynamic table name variable and passed to the BQ operator.

Upvotes: 9

MassyB
MassyB

Reputation: 1184

The main issue here is that I don't have access to the new version of google cloud python API, the prod is using version 0.27.0. So, to get the job done, I made something bad and dirty:

  • saved the query result in a sharded table, let it be table_sharded
  • got table_sharded's schema, let it be table_schema
  • saved " SELECT * FROM dataset.table_sharded" query to a partitioned table providing table_schema

All this is abstracted in one single operator that uses a hook. The hook is responsible of creating/deleting tables/partitions, getting table schema and running queries on BigQuery.

Have a look at the code. If there is any other solution, please let me know.

Upvotes: 0

Related Questions