Reputation: 177
Is there a way to persist an XCOM value during re-runs of a DAG step (after clearing the status)?
Below is a simplified version of what I'm trying to accomplish, namely when a DAG step status is cleared and the step re-run, I would like to be able to load the XCOM value pushed on the previous run. However, even though I can see the value in the XCOM interface, the value does not get pulled. I've looked through the source code for the pull_xcom()
method but can't figure out where it is being filtered out.
The functionality I'm trying to achieve is to maintain some amount of state between failed runs of a DAG. In the example, this would mean that 1 is added to the stored value every time the DAG step is cleared and rerun.
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
def test_step(**kwargs):
ti = kwargs.get('task_instance')
value = ti.xcom_pull(key='key', include_prior_dates=True)
if value is None:
value = 0
print(f'BEFORE VALUE: {value}')
value += 1
print(f'AFTER VALUE: {value}')
ti.xcom_push(key='key', value=value)
# Simulating a failure
raise Exception
default_args = {
'owner': 'Testing',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
}
dag = DAG(
'test_dag',
default_args=default_args,
schedule_interval=None,
start_date=datetime(2020, 4, 9),
)
t1 = PythonOperator(
task_id='test_step',
provide_context=True,
python_callable=test_step,
dag=dag,
)
t1
Upvotes: 1
Views: 2104
Reputation: 1
To add on this answer, you don't need to actually have the dummy
task existing in your DAG.
You can push Xcom to an non-existing task_id, as part of your dag.
E.g. in PythonOperator
python_callable
do:
XCom.set(key="your_key",
value=value,
execution_date=execution_date,
task_id=task.task_id + "_some_suffix",
dag_id=dag.dag_id
)
and later retrieve it using:
task_id="previous_task_id" + "_some_suffix"
, e.g. like so:
task_instance.xcom_pull(
task_ids=task_instance.task_id + "_some_suffix",
key="your_key",
include_prior_dates=False)
Be careful, since this XCom will not get deleted on reruns. It can be however overriden.
Upvotes: -1
Reputation: 6548
Anytime a task is about to run, its XCom is cleared for the current execution date (https://github.com/apache/airflow/blob/1.10.10/airflow/models/taskinstance.py#L960). This is why you won't ever pull values from previous task tries. Use of include_prior_dates=True
only pulls from previous execution dates, but not previous runs of the same execution date.
One possible solution is to put a DummyOperator
task upstream of your test_step
task, called say xcom_store.test_step
. Then use airflow.models.XCom.set() directly in test_step
to your XCom values into the xcom_store.test_step
task (reference xcom_push() as an example). When you need to pull, just pull as you usually would with but from the dummy task instead, i.e. ti.xcom_pull(task_ids='xcom_store.test_step', key='key')
. Definitely not ideal and could lead to some confusion, but if you standardize it and build some helpers around it, it could be alright?
Upvotes: 5