Reputation: 1043
My use-case is as follows:
Task A
is generating dataset
using some input raw dataTask B
is running some code using dataset
as inputTask C
is running some other code using dataset
as inputThe three tasks are scheduled to run daily, and Task B
and Task C
are scheduled to run enough time after Task A
, and simply fail if the input dataset has not been generated for some reason.
As a first improvement I added an ExternalTaskSensor
in both Task B
and Task C
, but this just avoids to run them if Task A
did not yet finished or failed.
However, ExternalTaskSensor
seems not to be working fine with backfilling (it is pretty fragile as it relies on execution date only, plus if Task A
is run again, Task B
and Task C
won't know).
Solution 1 (not applicable): I have seen this SO's question: In airflow, is there a good way to call another dag's task?
This is not ideal for me because I'd like to keep Task A
unaware of the dependent tasks, and handle the logic in Task B
and Task C
(or externally). Reason is that other tasks consuming the output of Task A
will be added in the future (from different teams across the organization), and it's not desirable to update Task A
each time.
Summary
I'd like to trigger Task B
and Task C
if and only if Task A
has been executed with success (independently if it has been scheduled or triggered manually), without modifying Task A
to achieve that.
Upvotes: 0
Views: 1413
Reputation: 910
To suit your scenario, the only concept I can think of is SubDags. (Refer to WARNING before you implement)
SubDagOperator
allows you to attach a set of tasks to your task A
. Refer to code below.
dag = DAG('parent_dag', description='Parent',
schedule_interval='@daily',
start_date=datetime.now())
task_a = DummyOperator(dag=dag, task_id='task_a')
subdag_task = SubDag(task_id='load_tasks',
subdag=load_subdag('parent_dag', 'dependent_tasks'),
dag=dag)
task_a >> subdag_task
Now in a separate file you define your load_subdag
function.
def load_subdag(parent_dag_name, child_dag_name):
dag_subdag = DAG(
dag_id='{0}.{1}'.format(parent_dag_name, child_dag_name),
schedule_interval="@daily",
)
with dag_subdag:
task_b = DummyOperator(
task_id='load_subdag_task_b',
dag=dag_subdag)
task_c = DummyOperator(
task_id='load_subdag_task_c',
dag=dag_subdag)
return dag_subdag
WARNING (In Red and bold): SubDag tasks occupy slots in your worker like maggots. Please understand the caveats completely before you jump into this. AIRFLOW-74 gives a picture of how bad it can be. It is outright rejected by many developers for the same reason.
Upvotes: 1