Reputation: 1939
I need to create a DAG that deletes and updates a few different tables. The updates happen by region. The database I work with does a table lock when doing any deletes or updates, so I would need to structure my dag like below, so that I avoid trying to update the same table at the same time.
-->
equals dependent on
Florida_table_1 --> Carolina_table_1 --> Texas_table_1
Florida_table_2 --> Carolina_table_2 --> Texas_table_2
Florida_table_3 --> Carolina_table_3 --> Texas_table_3
Worse comes to worse, I can write out all the tasks separately, but I was wondering if there was a smart way to do it dynamically?
Upvotes: 0
Views: 188
Reputation: 4366
I would so something like the following:
list_of_states = ["Alabama", "Alaska", "Arizona" ...] # I forgot the song...
def state_task(which_state):
print(f"Working on {which_state}!")
[...]
with DAG(dag_id="states_process", ...) as dag:
prior_task = the_start = DummyOperator(task_id="the_start")
for which_state in list_of_states:
prior_task = prior_task >> PythonOperator(
task_id=f"{which_state}_task",
python_callable=state_task,
op_args=(which_state,)
)
This is off the top of my head but the concept is basically to leverage Airflow's >>
syntax to declare the upstream and also return the task which we save off to use as the upstream of the next: prior_task = prior_task >> PythonOperator
Upvotes: 1