vijay krishna
vijay krishna

Reputation: 283

How to delete XCOM objects once the DAG finishes its run in Airflow

I have a huge json file in the XCOM which later I do not need once the dag execution is finished, but I still see the Xcom Object in the UI with all the data, Is there any way to delete the XCOM programmatically once the DAG run is finished.

Thank you

Upvotes: 21

Views: 25198

Answers (5)

Osmandi
Osmandi

Reputation: 51

My solution to this problem is:

from airflow.utils.db import provide_session
from airflow.models import XCom

dag = DAG(...)

@provide_session
def cleanup_xcom(**context):     
    dag = context["dag"]
    dag_id = dag._dag_id 
    session=context["session"]
    session.query(XCom).filter(XCom.dag_id == dag_id).delete()

clean_xcom = PythonOperator(
    task_id="clean_xcom",
    python_callable = cleanup_xcom,
    provide_context=True, 
    dag=dag
)

clean_xcom

In Airflow 2.1.x, the code below likes not to work ...

from airflow.models import DAG
from airflow.utils.db import provide_session
from airflow.models import XCom

@provide_session
def cleanup_xcom(context, session=None):
    dag_id = context["ti"]["dag_id"]
    session.query(XCom).filter(XCom.dag_id == dag_id).delete()

dag = DAG( ...
    on_success_callback=cleanup_xcom,
)

so change to

from airflow.models import DAG
from airflow.utils.db import provide_session
from airflow.models import XCom
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

with DAG(dag_id="cleanup_xcom_demo", schedule_interval=None, start_date=days_ago(2)) as dag:
    # cleanup_xcom
    @provide_session
    def cleanup_xcom(session=None, **context):
        dag = context["dag"]
        dag_id = dag._dag_id 
        # It will delete all xcom of the dag_id
        session.query(XCom).filter(XCom.dag_id == dag_id).delete()

    clean_xcom = PythonOperator(
        task_id="clean_xcom",
        python_callable = cleanup_xcom,
        provide_context=True, 
        # dag=dag
    )
    
    start  = DummyOperator(task_id="start")
    end = DummyOperator(task_id="end", trigger_rule="none_failed")
    
    start >> clean_xcom >> end

Upvotes: 5

laol
laol

Reputation: 464

Using

from sqlalchemy import func 
[...]
session.query(XCom).filter(XCom.execution_date <= func.date('2019-06-01')).delete()

to filter by date (as proposed above) did not work for me. Instead I had to provide a datetime (including timezone):

from airflow.models import XCom
from datetime import datetime, timedelta, timezone

[...]

@provide_session
def cleanup_xcom(session=None):
    ts_limit = datetime.now(timezone.utc) - timedelta(days=2)
    session.query(XCom).filter(XCom.execution_date <= ts_limit).delete()
    logging.info(f"deleted all XCOMs older than {ts_limit}")

xcom_cleaner = python_operator.PythonOperator(
    task_id='delete-old-xcoms',
    python_callable=cleanup_xcom)

xcom_cleaner 

Upvotes: 0

Gorka
Gorka

Reputation: 291

You can perform the cleanup programmatically through sqlalchemy so your solution won't break if the database structure changes:

from airflow.utils.db import provide_session
from airflow.models import XCom

@provide_session
def cleanup_xcom(session=None):
    session.query(XCom).filter(XCom.dag_id == "your dag id").delete()

You can also purge old XCom data:

from airflow.utils.db import provide_session
from airflow.models import XCom
from sqlalchemy import func

@provide_session
def cleanup_xcom(session=None):
    session.query(XCom).filter(XCom.execution_date <= func.date('2019-06-01')).delete()

If you want to purge the XCom once the dag is finished I think the cleanest solution is to use the "on_success_callback" property of the DAG model class:

from airflow.models import DAG
from airflow.utils.db import provide_session
from airflow.models import XCom

@provide_session
def cleanup_xcom(context, session=None):
    dag_id = context["ti"]["dag_id"]
    session.query(XCom).filter(XCom.dag_id == dag_id).delete()

dag = DAG( ...
    on_success_callback=cleanup_xcom,
)

Upvotes: 17

Chandan
Chandan

Reputation: 732

Below is the code that worked for me,this will delete xcom of all tasks in DAG(Add task_id to SQL if xcom of only specific task needs to be deleted):

As dag_id is dynamic and dates should follow respective syntax of SQL.

from airflow.operators.postgres_operator import PostgresOperator

delete_xcom_task_inst = PostgresOperator(task_id='delete_xcom',
                                            postgres_conn_id='your_conn_id',
                                            sql="delete from xcom where dag_id= '"+dag.dag_id+"' and date(execution_date)=date('{{ ds }}')"
                                            )

Upvotes: 3

Omar14
Omar14

Reputation: 2055

You have to add a task depends on you metadatadb (sqllite, PostgreSql, MySql..) that delete XCOM once the DAG run is finished.

delete_xcom_task = PostgresOperator(
      task_id='delete-xcom-task',
      postgres_conn_id='airflow_db',
      sql="delete from xcom where dag_id=dag.dag_id and 
           task_id='your_task_id' and execution_date={{ ds }}",
      dag=dag)

You can verify your query before you run the dag.

Data Profiling -> Ad Hoc Query -> airflow_db -> query -> Run!

xcom metadata

Upvotes: 12

Related Questions