Reputation: 110
I have created below DAG for running a sql script daily. How can I save the query results to JSON file and save in DAG folder in Google Composer?
import datetime
import airflow
from airflow.operators import bash_operator
from airflow.contrib.operators import bigquery_operator
START_DATE = datetime.datetime(2020, 3, 1)
default_args = {
'owner': 'Alen',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=15),
'start_date': START_DATE,
}
with airflow.DAG(
'Dag_Name',
'catchup=False',
default_args=default_args,
schedule_interval=datetime.timedelta(days=1)) as dag:
task_name = bigquery_operator.BigQueryOperator(
task_id='task_name',
sql= 'query.sql',
use_legacy_sql=False,
write_disposition= 'WRITE_TRUNCATE' ,
destination_dataset_table='Project.Dataset.destination_table')
Upvotes: 0
Views: 669
Reputation: 896
One alternative is to run an export from BQ to GCS with the DAG folder as your destination.
You can use the bash or bq operator
Then run something like this in the end of your script:
copy_files_to_DAG_folder = bash_operator.BashOperator(
task_id='Copy_files_to_GCS',
bash_command='bq extract --destination_format JSON--print_header=false 'BQ_TABLE'
'GCS_DAG_FOLDER_LOCATION''
From Docs:
bq --location=location extract \
--destination_format format \
--compression compression_type \
--field_delimiter delimiter \
--print_header=boolean \
project_id:dataset.table \
gs://bucket/filename.ext
Upvotes: 2