Reputation: 766
I am creating DAGs dynamically, following instructions found in Dynamically Generating DAGs in Airflow, modifying the number of dags to be created via a variable k
:
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
def create_dag(dag_id,
schedule,
dag_number,
default_args):
def hello_world_py(*args):
print('Hello World')
print('This is DAG: {}'.format(str(dag_number)))
dag = DAG(dag_id,
schedule_interval=schedule,
default_args=default_args)
with dag:
t1 = PythonOperator(
task_id='hello_world',
python_callable=hello_world_py,
dag_number=dag_number)
return dag
# build k dags
k = 5
for n in range(1, k + 1):
dag_id = 'hello_world_{}'.format(str(n))
default_args = {'owner': 'airflow',
'start_date': datetime(2018, 1, 1)
}
schedule = '@daily'
dag_number = n
globals()[dag_id] = create_dag(dag_id,
schedule,
dag_number,
default_args)
I can check the created DAGs with the UI and the CLI. Both are in sync:
> airflow dags list
dag_id | filepath | owner | paused
==============+================+=========+=======
hello_world_1 | hello_world.py | airflow | True
hello_world_2 | hello_world.py | airflow | True
hello_world_3 | hello_world.py | airflow | True
hello_world_4 | hello_world.py | airflow | True
hello_world_5 | hello_world.py | airflow | True
Now, if I decrease k
to 3, the CLI lists only 3 dags as expected. However the UI keeps showing 5 dags.
How to keep the UI in sync with the number of dags to be created? How to delete DAGs programmatically in python? I would like to delete DAGs as easily as I create them.
Upvotes: 2
Views: 1661
Reputation: 49
Here is a way to delete dag danymicly.
from airflow.api.common.experimental.delete_dag import delete_dag
from airflow.utils.session import provide_session
from airflow.models import DagModel
enabled_dags = [] # dynamic enabled_dags
@provide_session
def get_all_dag_ids(session=None):
all_objs = session.query(DagModel).all()
return [i.dag_id for i in all_objs]
all_dag_ids = get_all_dag_ids() # all dag in database
for k in [gk for gk in all_dag_ids if not in enabled_dags]:
delete_dag(k)
del globals()[k]
airflow just load dag files always add to database. if delete dag with dag_id query all dag_id from db, then delete it.
Hope this helpful.
Upvotes: 2