Reputation: 9128
I have slack alerts for when tasks fail, but I also want to have recovery messages as well.
When the task initially fails, in its on_failure_callback
it does an xcom_push
. What I save here is available in the next DAG run using:
context['ti'].xcom_pull(key='my_task_state',
task_ids=context['task'].task_id,
include_prior_dates=True)
However, if I clear the failed task so that it re-runs, in its on_failure_callback
/on_success_callback
I try this to get the value I saved in the initial attempt:
context['ti'].xcom_pull(key='my_task_state',
task_ids=context['task'].task_id,
include_prior_dates=False)
This returns None
. If I set include_prior_dates=True
it’ll return the value from the previous DAG run, but not the current one where the task was cleared.
Am I doing something wrong, or is there a workaround that I can use to get the XCom value I’m looking for?
Upvotes: 3
Views: 4122
Reputation: 9128
Yong Wang's answer explains really well why I wasn't able to get the values I wanted. I was able to come up with a workaround however.
xcom_push
and xcom_pull
both just call class methods on XCom. You can call these directly. As it turns out, you can use a made up task id and it will save it to the xcom
table under that id. Since it's not a real task, it won't be deleted when the task (nor DAG) is cleared.
from airflow.models import XCom
def set_xcom(context, value):
XCom.set(key='my_key',
value=value
task_id='{}_SOME_SUFFIX'.format(context['ti'].task_id),
dag_id=context['ti'].dag_id,
execution_date=context['ti'].execution_date)
def get_xcom(context):
return XCom.get_one(context['ti'].execution_date,
key='my_key',
task_id='{}_SOME_SUFFIX'.format(context['ti'].task_id),
dag_id=context['ti'].dag_id,
include_prior_dates=False)
It's not the standard way of using XCom, so I'll just have to be careful when upgrading to a new version of Airflow in the future.
Upvotes: 8
Reputation: 1313
In my understanding:
Xcom is designed to for inter-exchange message between tasks. And Xcom status is depends on the task instance. If the task instance is cleared(delted), the xcom history info belong to this instance will be deleted as well.
That's why you get
none when include_prior_dates=False, (task instance deleted, there is no such xcom record)
last dag info when include_prior_dates=True,(task instance delted, but another dags tasks instance xcome was poped(most recented).
Here is show case by default example_xcom:
Thats' why your program get none.
if you find the answer is help, pls vote it up. Thanks
Upvotes: 6