Reputation: 2592
I have the following Airflow dag:
start_task = DummyOperator(task_id='start_task', dag=dag)
gcs_export_uri_template = 'adstest/2018/08/31/*'
update_bigquery = GoogleCloudStorageToBigQueryOperator(
dag=dag,
task_id='load_ads_to_BigQuery',
bucket=GCS_BUCKET_ID,
destination_project_dataset_table=table_name_template,
source_format='CSV',
source_objects=[gcs_export_uri_template],
schema_fields=dc(),
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_APPEND',
skip_leading_rows = 1,
google_cloud_storage_conn_id=CONNECTION_ID,
bigquery_conn_id=CONNECTION_ID
)
start_task >> update_bigquery
This dag load data from adstest/2018/08/31/*
to BigQuery and it works great.
I want to modify the Dag to run over dates based on execution date
:
Execution date
Execution date - 1 days
Execution date - 2 days
Example if Execution date is 2018-09-02
I want the DAG to go to:
Execution date : adstest/2018/09/02/*
Execution date - 1 days : adstest/2018/09/01/*
Execution date - 2 days : adstest/2018/08/31/*
How can I do that?
Edit: This is my updated code:
for i in range(5, 0, -1):
gcs_export_uri_template = ['''adstest/{{ macros.ds_format(macros.ds_add(ds, -{0}), '%Y-%m-%d', '%Y/%m/%d') }}/*'''.format(i)]
update_bigquery = GoogleCloudStorageToBigQueryOperator(
dag=dag,
task_id='load_ads_to_BigQuery-{}'.format(i),
bucket=GCS_BUCKET_ID,
destination_project_dataset_table=table_name_template,
source_format='CSV',
source_objects=gcs_export_uri_template,
schema_fields=dc(),
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_APPEND',
skip_leading_rows=1,
google_cloud_storage_conn_id=CONNECTION_ID,
bigquery_conn_id=CONNECTION_ID
)
start_task >> update_bigquery
Edit 2:
My code:
for i in range(5, 0, -1):
gcs_export_uri_template = ['''adstest/{{ macros.ds_format(macros.ds_add(ds, -params.i), '%Y-%m-%d', '%Y/%m/%d') }}/*'''.format(i)]
update_bigquery = GoogleCloudStorageToBigQueryOperator(
dag=dag,
task_id='load_ads_to_BigQuery-{}'.format(i),
bucket=GCS_BUCKET_ID,
destination_project_dataset_table=table_name_template,
source_format='CSV',
source_objects=gcs_export_uri_template,
schema_fields=dc(),
params={'i': i},
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_APPEND',
skip_leading_rows=1,
google_cloud_storage_conn_id=CONNECTION_ID,
bigquery_conn_id=CONNECTION_ID
)
The code gives this error:
"Source URI must not contain the ',' character: gs://adstest/{ macros.ds_format(macros.ds_add(ds, -params.i), '%Y-%m-%d', '%Y/%m/%d') }/*">
Upvotes: 0
Views: 1270
Reputation: 18844
You can use Airflow Macros to achieve this as follows:
gcs_export_uri_template=[
"adstest/{{ macros.ds_format(ds, '%Y-%m-%d', '%Y/%m/%d') }}/*",
"adstest/{{ macros.ds_format(prev_ds, '%Y-%m-%d', '%Y/%m/%d') }}/*",
"adstest/{{ macros.ds_format(macros.ds_add(ds, -2), '%Y-%m-%d', '%Y/%m/%d') }}/*"
]
update_bigquery = GoogleCloudStorageToBigQueryOperator(
dag=dag,
task_id='load_ads_to_BigQuery',
bucket=GCS_BUCKET_ID,
destination_project_dataset_table=table_name_template,
source_format='CSV',
source_objects=gcs_export_uri_template,
schema_fields=dc(),
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_APPEND',
skip_leading_rows = 1,
google_cloud_storage_conn_id=CONNECTION_ID,
bigquery_conn_id=CONNECTION_ID
)
When you run the above code, you can check in the Web UI, the rendered parameter:
For EDITED Comment:
You will need to pass the value of the loop variable i
in params
parameter and use it in the string as params.i
as follows:
for i in range(5, 0, -1):
gcs_export_uri_template = ["adstest/{{ macros.ds_format(macros.ds_add(ds, -params.i), '%Y-%m-%d', '%Y/%m/%d') }}/*"]
update_bigquery = GoogleCloudStorageToBigQueryOperator(
dag=dag,
task_id='load_ads_to_BigQuery-{}'.format(i),
bucket=GCS_BUCKET_ID,
destination_project_dataset_table=table_name_template,
source_format='CSV',
source_objects=gcs_export_uri_template,
schema_fields=dc(),
params={'i': i},
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_APPEND',
skip_leading_rows=1,
google_cloud_storage_conn_id=CONNECTION_ID,
bigquery_conn_id=CONNECTION_ID
)
start_task >> update_bigquery
Upvotes: 3