sanjayr
sanjayr

Reputation: 1939

Airflow: Dynamic Task dependent on itself completing first

I need to create a DAG that deletes and updates a few different tables. The updates happen by region. The database I work with does a table lock when doing any deletes or updates, so I would need to structure my dag like below, so that I avoid trying to update the same table at the same time.

--> equals dependent on

Florida_table_1 --> Carolina_table_1 --> Texas_table_1
Florida_table_2 --> Carolina_table_2 --> Texas_table_2
Florida_table_3 --> Carolina_table_3 --> Texas_table_3

Worse comes to worse, I can write out all the tasks separately, but I was wondering if there was a smart way to do it dynamically?

Upvotes: 0

Views: 188

Answers (1)

joebeeson
joebeeson

Reputation: 4366

I would so something like the following:

list_of_states = ["Alabama", "Alaska", "Arizona" ...] # I forgot the song...

def state_task(which_state):
    print(f"Working on {which_state}!")
    [...]


with DAG(dag_id="states_process", ...) as dag:
    prior_task = the_start = DummyOperator(task_id="the_start")
    for which_state in list_of_states:
        prior_task = prior_task >> PythonOperator(
            task_id=f"{which_state}_task", 
            python_callable=state_task,
            op_args=(which_state,)
        )

This is off the top of my head but the concept is basically to leverage Airflow's >> syntax to declare the upstream and also return the task which we save off to use as the upstream of the next: prior_task = prior_task >> PythonOperator

Upvotes: 1

Related Questions