Reputation: 1053
I have a use case in which, I am downloading some json files and parsing them. Depending on the files that are downloaded, the program needs to populate data in different tables. Once the data is loaded in the tables, an email notification must be sent.
For example, if the program needs to populate tables a and b (obtained from table_list), then the workflow should look like download >> parse >> [load_table_a, load_table_b] >> send_email
if tables a, b, c, d are obtained from table_list, then the workflow should look like download >> parse >> [load_table_a, load_table_b, load_table_c, load_table_d] >> send_email
Here is what I am trying. Can someone please help out.
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator
from datetime import datetime
from download_script import download
from parse_script import parse
from load_2_sf_script import get_table_list, insert_into_sf
from airflow.utils.email import send_email_smtp
default_args = {
'start_date': datetime(2021, 5, 18)
}
with DAG(
'Test DAG',
default_args = default_args,
catchup = False
) as dag:
download = PythonOperator(
task_id = 'download',
python_callable = download,
email_on_failure = True,
email = '[email protected]'
)
parse = PythonOperator(
task_id = 'parse',
python_callable = parse,
email_on_failure = True,
email = '[email protected]'
)
table_list = get_table_list()
task_list = []
for table in table_list:
task_list.append(
PythonOperator(
task_id = 'load_table_{}'.format(table),
python_callable = insert_into_sf,
email_on_failure = True,
email = '[email protected]',
op_kwargs = {'category': table}
)
)
send_email = EmailOperator(
task_id = 'send_email',
to = ['[email protected]'],
subject = 'Airflow: Success',
html_content = 'Dag run completed succesfully.'
)
download >> parse >> [task for task in task_list] >> send_email
Upvotes: 6
Views: 12144
Reputation: 23
I'm trying to implement something very similar, but we use the pattern where dag=DAG(...) and tasks defined at the top level with dag injected with dag=dag. A PythonOperator task calls a function that contains the for loop. It executes, but the dynamic tasks never run. What is there some nuance to getting it to work using this pattern? Pseudo code:
dag = DAG(
...
)
def polling_fn(ti, **kwargs):
for ec2Id in ['a', 'b','c']:
stop_instances_task = EC2StopInstanceOperator(task_id=f"stop_instances_{ec2Id}", aws_conn_id='aws_default', instance_id=ec2Id, dag=dag)
polling_task >> stop_instances_task >> end_task
load_variables_task = PythonOperator(
task_id="load_variables",
python_callable=load_variables_fn,
provide_context=True
)
start_task = DummyOperator(
task_id='start',
dag=dag,
)
polling_task = PythonOperator(
task_id='polling_task',
python_callable=polling_fn,
dag=dag,
)
end_task = DummyOperator(
task_id='done',
dag=dag,
)
start_task >> polling_task >> end_task
Upvotes: 0
Reputation: 808
Then this will work:
with DAG(
'medical_device',
default_args=default_args,
catchup=False
) as dag:
download_task = PythonOperator(
task_id='download_task',
python_callable=download,
email_on_failure=True,
email='[email protected]'
)
parse_task = PythonOperator(
task_id='parse_task',
python_callable=parse,
email_on_failure=True,
email='[email protected]'
)
send_email = EmailOperator(
task_id='send_email',
to=['[email protected]'],
subject='Airflow: Success',
html_content='Dag run completed succesfully.'
)
download_task >> parse_task
table_list = get_table_list()
for table in table_list:
op = PythonOperator(
task_id='load_table_{}'.format(table),
python_callable=insert_into_sf,
email_on_failure=True,
email='[email protected]',
op_kwargs={'category': table}
)
parse_task >> op >> send_email
You don't need to construct the list, you can set the upstream and downstream relations dynamically in the for loop using parse_task >> op >> send_email
.
Tip: try to keep your task_id inline with names of variables of the tasks, that's not necessary but is a good practice.
Upvotes: 8