Pablo
Pablo

Reputation: 3514

Delete a DAG in google composer - Airflow UI

I want to delete a DAG from the Airflow UI, that's not longer available in the GCS/dags folder. I know that Airflow has a "new" way to remove dags from the DB using airflow delete_dag my_dag_id command, seen in https://stackoverflow.com/a/49683543/5318634

It seems that in composer airflow version the delete_dag command is not yet supported.

Do not try this: I've also tried using airflow resetdb and the airflow UI died

Is there a way to delete the dags that are not currently in the gs://BUCKET/dags/ folder ?

Upvotes: 5

Views: 4145

Answers (3)

Akshay Lande
Akshay Lande

Reputation: 87

It will be take place in 2 steps:- step1:-

First you have to delete airflow_monitoring.py file with command from storage bucket.

gcloud composer environments storage dags delete --environment viu-etl-prod-composer --location us-central1 airflow_monitoring.py

step 2:-
click on red cross check as shown in picture.

enter image description here

Upvotes: 0

Bikram
Bikram

Reputation: 96

I created a DAG to cleanup UI. It reads afDagID from Airflow Variables

from airflow import DAG
from airflow import models
from airflow.operators.mysql_operator import MySqlOperator
import logging
from datetime import datetime
from airflow.operators.dummy_operator import DummyOperator

dag = DAG('ManageAirFlow', description='Deletes Airflow DAGs from backend: Uses vars-  afDagID',
      schedule_interval=None,
      start_date=datetime(2018, 3, 20), catchup=False)

DeleteXComOperator = MySqlOperator(
  task_id='delete-xcom-record-task',
  mysql_conn_id='airflow_db',
  sql="DELETE from xcom where dag_id='{}'".format(models.Variable.get('afDagID')),
  dag=dag)

DeleteTaskOperator = MySqlOperator(
  task_id='delete-task-record-task',
  mysql_conn_id='airflow_db',
  sql="DELETE from task_instance where dag_id='{}'".format(models.Variable.get('afDagID')),
  dag=dag)

DeleteSLAMissOperator = MySqlOperator(
  task_id='delete-sla-record-task',
  mysql_conn_id='airflow_db',
  sql="DELETE from sla_miss where dag_id='{}'".format(models.Variable.get('afDagID')),
  dag=dag)

DeleteLogOperator = MySqlOperator(
  task_id='delete-log-record-task',
  mysql_conn_id='airflow_db',
  sql="DELETE from log where dag_id='{}'".format(models.Variable.get('afDagID')),
  dag=dag)

DeleteJobOperator = MySqlOperator(
  task_id='delete-job-record-task',
  mysql_conn_id='airflow_db',
  sql="DELETE from job where dag_id='{}'".format(models.Variable.get('afDagID')),
  dag=dag)

DeleteDagRunOperator = MySqlOperator(
  task_id='delete-dag_run-record-task',
  mysql_conn_id='airflow_db',
  sql="DELETE from dag_run where dag_id='{}'".format(models.Variable.get('afDagID')),
  dag=dag)

DeleteDagOperator = MySqlOperator(
  task_id='delete-dag-record-task',
  mysql_conn_id='airflow_db',
  sql="DELETE from dag where dag_id='{}'".format(models.Variable.get('afDagID')),
  dag=dag)



DeleteXComOperator >> DeleteTaskOperator >> DeleteSLAMissOperator >> DeleteLogOperator >> DeleteJobOperator >> DeleteDagRunOperator >> DeleteDagOperator

Upvotes: 8

kaxil
kaxil

Reputation: 18824

As cloud composer is using the latest stable version i.e. 1.9.0, the feature to delete dag is not available.

However,

There are few instructions in the docs to delete a dag as below:

 gcloud beta composer environments storage dags delete \
     --environment ENVIRONMENT_NAME \
     --location LOCATION \
     DAG_NAME.py 

but unfortunately, this would not remove the DAG from the Airflow web interface.

More info: https://cloud.google.com/composer/docs/how-to/using/managing-dags#deleting_a_dag

Upvotes: 2

Related Questions