Alen T Mathew
Alen T Mathew

Reputation: 110

Save Bigquery results to JSON in Google Composer

I have created below DAG for running a sql script daily. How can I save the query results to JSON file and save in DAG folder in Google Composer?

import datetime
import airflow
from airflow.operators import bash_operator
from airflow.contrib.operators import bigquery_operator

START_DATE = datetime.datetime(2020, 3, 1)

default_args = {
    'owner': 'Alen',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=15),
    'start_date': START_DATE,
}

with airflow.DAG(
        'Dag_Name',
        'catchup=False',
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1)) as dag:

    task_name = bigquery_operator.BigQueryOperator(
        task_id='task_name',
        sql= 'query.sql',
        use_legacy_sql=False,
        write_disposition= 'WRITE_TRUNCATE' ,        
        destination_dataset_table='Project.Dataset.destination_table')

Upvotes: 0

Views: 669

Answers (1)

Calle Engene
Calle Engene

Reputation: 896

One alternative is to run an export from BQ to GCS with the DAG folder as your destination.

You can use the bash or bq operator

Then run something like this in the end of your script:

copy_files_to_DAG_folder = bash_operator.BashOperator(
    task_id='Copy_files_to_GCS',
    bash_command='bq extract --destination_format JSON--print_header=false 'BQ_TABLE' 
    'GCS_DAG_FOLDER_LOCATION''

From Docs:

 bq --location=location extract \
 --destination_format format \
 --compression compression_type \
 --field_delimiter delimiter \
 --print_header=boolean \
 project_id:dataset.table \
 gs://bucket/filename.ext

Upvotes: 2

Related Questions