Brendan
Brendan

Reputation: 2075

Airflow Deferrable Operator Pattern for Event-driven DAGs

I'm looking for examples of patterns in place for event-driven DAGs, specifically those with dependencies on other DAGs. Let's start with a simple example:

dag_a -> dag_b

dag_b depends on dag_a. I understand that at the end of dag_a I can add a trigger to launch dag_b. However, this philosophically feels misaligned from an abstraction standpoint: dag_a does not need to understand or know that dag_b exists, yet this pattern would enforce the responsibility of calling dag_b on dag_a.

Let's consider a slightly more complex example (pardon my poor ASCII drawing skills):

dag_a ------> dag_c
         /
dag_b --/

In this case, if dag_c depends on both dag_a and dag_b. I understand that we could set up a sensor for the output of each dag_a and dag_b, but with the advent of deferrable operators, it doesn't seem that this remains a best practice. I suppose I'm wondering how to set up a DAG of DAGs in an async fashion.

The potential for deferrable operators for event-driven DAGs is introduced in Astronomer's guide here: https://www.astronomer.io/guides/deferrable-operators, but it's unclear how it would be best applied these in light of the above examples.

More concretely, I'm envisioning a use case where multiple DAGs run every day (so they share the same run date), and the output of each DAG is a date partition in a table somewhere. Downstream DAGs consume the partitions of the upstream tables, so we want to schedule them such that downstream DAGs don't attempt to run before the upstream ones complete.

Right now I'm using a "fail fast and often" approach in downstream dags, where they start running at the scheduled date, but first check if the data they need exists upstream, and if not the task fails. I have these tasks set to retry every x interval, with high number of retries (e.g. retry every hour for 24 hours, if it's still not there then something is wrong and the DAG fails). This is fine since 1) it works for the most part and 2) I don't believe the failed tasks continue to occupy a worker slot between retries, so it actually is somewhat async (I could be wrong). It's just a little crude, so I'm imagining there is a better way.

Any tactical advice for how to set this relationship up to be more event driven while still benefitting from the async nature of deferrable operators is welcome.

Upvotes: 2

Views: 1155

Answers (2)

Kombajn zbożowy
Kombajn zbożowy

Reputation: 10693

Starting from Airflow 2.4 you can use data-aware scheduling. It would look like this:

dag_a_finished = Dataset("dag_a_finished")

with DAG(dag_id="dag_a", ...):
    # Last task in dag_a
    BashOperator(task_id="final", outlets=[dag_a_finished], ...)

with DAG(dag_id="dag_b", schedule=[dag_a_finished], ...):
    ...

with DAG(dag_id="dag_c", schedule=[dag_a_finished], ...):
    ...

In theory Dataset should represent some piece of data, but technically nothing prevents you from making it just a string identifier used for setting up DAG dependency - just like in above example.

Upvotes: 1

Angas Liu
Angas Liu

Reputation: 11

we are using an event bus to connect the DAGs, the end task of a DAG will send the event out, and followed DAG will be triggered in the orchestrator according to the event type.

Upvotes: 1

Related Questions