pgrandjean
pgrandjean

Reputation: 766

How to dynamically delete DAGs?

I am creating DAGs dynamically, following instructions found in Dynamically Generating DAGs in Airflow, modifying the number of dags to be created via a variable k:

from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

def create_dag(dag_id,
               schedule,
               dag_number,
               default_args):

    def hello_world_py(*args):
        print('Hello World')
        print('This is DAG: {}'.format(str(dag_number)))

    dag = DAG(dag_id,
              schedule_interval=schedule,
              default_args=default_args)

    with dag:
        t1 = PythonOperator(
            task_id='hello_world',
            python_callable=hello_world_py,
            dag_number=dag_number)

    return dag


# build k dags
k = 5
for n in range(1, k + 1):
    dag_id = 'hello_world_{}'.format(str(n))

    default_args = {'owner': 'airflow',
                    'start_date': datetime(2018, 1, 1)
                    }

    schedule = '@daily'

    dag_number = n

    globals()[dag_id] = create_dag(dag_id,
                                  schedule,
                                  dag_number,
                                  default_args)

I can check the created DAGs with the UI and the CLI. Both are in sync:

> airflow dags list
dag_id        | filepath       | owner   | paused
==============+================+=========+=======
hello_world_1 | hello_world.py | airflow | True
hello_world_2 | hello_world.py | airflow | True
hello_world_3 | hello_world.py | airflow | True
hello_world_4 | hello_world.py | airflow | True
hello_world_5 | hello_world.py | airflow | True

Now, if I decrease k to 3, the CLI lists only 3 dags as expected. However the UI keeps showing 5 dags.

How to keep the UI in sync with the number of dags to be created? How to delete DAGs programmatically in python? I would like to delete DAGs as easily as I create them.

Upvotes: 2

Views: 1661

Answers (1)

Z.wk
Z.wk

Reputation: 49

Here is a way to delete dag danymicly.

from airflow.api.common.experimental.delete_dag import delete_dag
from airflow.utils.session import provide_session
from airflow.models import DagModel

enabled_dags = [] # dynamic enabled_dags

@provide_session
def get_all_dag_ids(session=None):
    all_objs = session.query(DagModel).all()
    return [i.dag_id for i in all_objs]

all_dag_ids = get_all_dag_ids() # all dag in database

for k in [gk for gk in all_dag_ids if not in enabled_dags]:
    delete_dag(k)
    del globals()[k]

airflow just load dag files always add to database. if delete dag with dag_id query all dag_id from db, then delete it.

Hope this helpful.

Upvotes: 2

Related Questions