Jeff Erickson
Jeff Erickson

Reputation: 177

Airflow: Rerunning DAG can't load XCOMs from previous run

Is there a way to persist an XCOM value during re-runs of a DAG step (after clearing the status)?

Below is a simplified version of what I'm trying to accomplish, namely when a DAG step status is cleared and the step re-run, I would like to be able to load the XCOM value pushed on the previous run. However, even though I can see the value in the XCOM interface, the value does not get pulled. I've looked through the source code for the pull_xcom() method but can't figure out where it is being filtered out.

The functionality I'm trying to achieve is to maintain some amount of state between failed runs of a DAG. In the example, this would mean that 1 is added to the stored value every time the DAG step is cleared and rerun.

from datetime import datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator


def test_step(**kwargs):
    ti = kwargs.get('task_instance')
    value = ti.xcom_pull(key='key', include_prior_dates=True)

    if value is None:
        value = 0
    print(f'BEFORE VALUE: {value}')
    value += 1
    print(f'AFTER VALUE: {value}')

    ti.xcom_push(key='key', value=value)

    # Simulating a failure
    raise Exception


default_args = {
    'owner': 'Testing',
    'depends_on_past': False,
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
}

dag = DAG(
    'test_dag',
    default_args=default_args,
    schedule_interval=None,
    start_date=datetime(2020, 4, 9),
)

t1 = PythonOperator(
    task_id='test_step',
    provide_context=True,
    python_callable=test_step,
    dag=dag,
)

t1

Upvotes: 1

Views: 2104

Answers (2)

RainOfData
RainOfData

Reputation: 1

To add on this answer, you don't need to actually have the dummy task existing in your DAG.

You can push Xcom to an non-existing task_id, as part of your dag.

E.g. in PythonOperator python_callable do:

XCom.set(key="your_key",
         value=value,
         execution_date=execution_date,
         task_id=task.task_id + "_some_suffix",
         dag_id=dag.dag_id
         )

and later retrieve it using: task_id="previous_task_id" + "_some_suffix", e.g. like so:

task_instance.xcom_pull(
        task_ids=task_instance.task_id + "_some_suffix",
        key="your_key",
        include_prior_dates=False)

Be careful, since this XCom will not get deleted on reruns. It can be however overriden.

Upvotes: -1

Daniel Huang
Daniel Huang

Reputation: 6548

Anytime a task is about to run, its XCom is cleared for the current execution date (https://github.com/apache/airflow/blob/1.10.10/airflow/models/taskinstance.py#L960). This is why you won't ever pull values from previous task tries. Use of include_prior_dates=True only pulls from previous execution dates, but not previous runs of the same execution date.

One possible solution is to put a DummyOperator task upstream of your test_step task, called say xcom_store.test_step. Then use airflow.models.XCom.set() directly in test_step to your XCom values into the xcom_store.test_step task (reference xcom_push() as an example). When you need to pull, just pull as you usually would with but from the dummy task instead, i.e. ti.xcom_pull(task_ids='xcom_store.test_step', key='key'). Definitely not ideal and could lead to some confusion, but if you standardize it and build some helpers around it, it could be alright?

Upvotes: 5

Related Questions