Reputation: 509
DAG code in Python which uses intermediate table write concept. But instead need to write this query results directly to GCS Bucket as csv file. Also, csv file extension
should hold the max date
used in query.
# Query results written to intermediate table
BQ_Output = BigQueryOperator(
task_id='BQ_Output',
write_disposition='WRITE_TRUNCATE',
use_legacy_sql=False,
allow_large_results=True,
sql="""
CREATE OR REPLACE TABLE `schema.result_table` as (
SELECT * except (store_name, state, week_start_date, season) FROM `schema.result_table` where week_start_date = (SELECT MAX(week_start_date) FROM `schema.result_table`))
""",
params=var_config,
dag=dag)
# Results Table to GCS Bucket
Bq_GCS = BigQueryToCloudStorageOperator(
task_id = 'Bq_GCS',
source_project_dataset_table = "schema.result_table",
destination_cloud_storage_uris = "gs://bucket_path/output_<max_date>_file.csv",
export_format = 'CSV',
field_delimiter = ',',
dag = dag
)
Kindly help me through this. Thanks!
Upvotes: 0
Views: 2553
Reputation: 12234
Consider using EXPORT DATA
DECLARE max_date DATE DEFAULT (SELECT MAX(week_start_date) FROM `schema.result_table`)
EXPORT DATA OPTIONS (
uri = 'gs://bucket_path/output_' || FORMAT_DATE('%Y%m%d', max_date) || '/_file_*.csv',
format = 'CSV',
overwrite = true,
header = false,
field_delimiter = ','
) AS
SELECT * except (store_name, state, week_start_date, season) FROM `schema.result_table` where week_start_date = max_date;
Below is a sample task of Airflow DAG for BigQuery Script.
BQ_Output = BigQueryInsertJobOperator(
task_id='BQ_Output',
configuration = {
"query": {
"query": """
<put above script here>
""",
"useLegacySql": False
}
},
location = 'asia-northeast3'
)
Upvotes: 3