Reputation: 2075
I'm looking for examples of patterns in place for event-driven DAGs, specifically those with dependencies on other DAGs. Let's start with a simple example:
dag_a -> dag_b
dag_b
depends on dag_a
. I understand that at the end of dag_a
I can add a trigger to launch dag_b
. However, this philosophically feels misaligned from an abstraction standpoint: dag_a
does not need to understand or know that dag_b
exists, yet this pattern would enforce the responsibility of calling dag_b
on dag_a
.
Let's consider a slightly more complex example (pardon my poor ASCII drawing skills):
dag_a ------> dag_c
/
dag_b --/
In this case, if dag_c
depends on both dag_a
and dag_b
. I understand that we could set up a sensor for the output of each dag_a
and dag_b
, but with the advent of deferrable operators, it doesn't seem that this remains a best practice. I suppose I'm wondering how to set up a DAG of DAGs in an async fashion.
The potential for deferrable operators for event-driven DAGs is introduced in Astronomer's guide here: https://www.astronomer.io/guides/deferrable-operators, but it's unclear how it would be best applied these in light of the above examples.
More concretely, I'm envisioning a use case where multiple DAGs run every day (so they share the same run date), and the output of each DAG is a date partition in a table somewhere. Downstream DAGs consume the partitions of the upstream tables, so we want to schedule them such that downstream DAGs don't attempt to run before the upstream ones complete.
Right now I'm using a "fail fast and often" approach in downstream dags, where they start running at the scheduled date, but first check if the data they need exists upstream, and if not the task fails. I have these tasks set to retry every x interval, with high number of retries (e.g. retry every hour for 24 hours, if it's still not there then something is wrong and the DAG fails). This is fine since 1) it works for the most part and 2) I don't believe the failed tasks continue to occupy a worker slot between retries, so it actually is somewhat async (I could be wrong). It's just a little crude, so I'm imagining there is a better way.
Any tactical advice for how to set this relationship up to be more event driven while still benefitting from the async nature of deferrable operators is welcome.
Upvotes: 2
Views: 1155
Reputation: 10693
Starting from Airflow 2.4 you can use data-aware scheduling. It would look like this:
dag_a_finished = Dataset("dag_a_finished")
with DAG(dag_id="dag_a", ...):
# Last task in dag_a
BashOperator(task_id="final", outlets=[dag_a_finished], ...)
with DAG(dag_id="dag_b", schedule=[dag_a_finished], ...):
...
with DAG(dag_id="dag_c", schedule=[dag_a_finished], ...):
...
In theory Dataset should represent some piece of data, but technically nothing prevents you from making it just a string identifier used for setting up DAG dependency - just like in above example.
Upvotes: 1
Reputation: 11
we are using an event bus to connect the DAGs, the end task of a DAG will send the event out, and followed DAG will be triggered in the orchestrator according to the event type.
Upvotes: 1