smullan
smullan

Reputation: 152

Airflow - Using an upstream task for multiple downstream tasks

Okay I apologize if this is a dumb question, because it seems so obvious that it should work. But I can't find it documented, and as are examining our options as we look to build a new data pipeline, I really want this to be a feature...

Can multiple downstream processes be dependent on a single upstream process, where the upstream process only runs once. In other words, can I extract a table one time, and then load it to my data warehouse, and have multiple aggregations that are dependent on that load being complete?

For a bit more information, we are attempting to go to an asynchronous extract-load-transform where the extract is started, and then the loads and transforms finish as soon as they have the subset of tables they need from the extract.

Upvotes: 2

Views: 6593

Answers (2)

Joshua Bonomini
Joshua Bonomini

Reputation: 111

If I'm understanding the question, yes, you can set downstream tasks to be dependent on the success of an upstream task.

We use dummyOperators in a lot of cases similar to this sample DAG: enter image description here

In the case we want the dummyOperator to kick off first and do something, before the downstream tasks kick off. This makes clearing out failed runs easier as well as we can simply clear the dummy operator and downstream tasks at the same time.

You can use the depends_on_past=True parameter to require upstream tasks run before the downstream tasks are queued, otherwise they can be skipped based on logic in the upstream task.

Upvotes: 3

tobi6
tobi6

Reputation: 8249

This seems to me like a usual DAG with unusual wording. I understand the required structure like this:

extract_table_task \
                   |- task1_do_stuff
                   |- task2_do_some_other_stuff
                   |- task3_...

Or in Airflow code:

extract_table_task.set_downstream(task1_do_stuff)
extract_table_task.set_downstream(task2_do_some_other_stuff)
extract_table_task.set_downstream(task3_...)

Then make sure to select the correct trigger rules for your workflow, e.g. if some tasks should run even if something went wrong: https://airflow.apache.org/concepts.html#trigger-rules

Upvotes: 3

Related Questions