Dean
Dean

Reputation: 9128

Getting XComs from cleared tasks in Airflow

I have slack alerts for when tasks fail, but I also want to have recovery messages as well.

When the task initially fails, in its on_failure_callback it does an xcom_push. What I save here is available in the next DAG run using:

context['ti'].xcom_pull(key='my_task_state',
                        task_ids=context['task'].task_id,
                        include_prior_dates=True)

However, if I clear the failed task so that it re-runs, in its on_failure_callback/on_success_callback I try this to get the value I saved in the initial attempt:

context['ti'].xcom_pull(key='my_task_state',
                        task_ids=context['task'].task_id,
                        include_prior_dates=False)

This returns None. If I set include_prior_dates=True it’ll return the value from the previous DAG run, but not the current one where the task was cleared.

Am I doing something wrong, or is there a workaround that I can use to get the XCom value I’m looking for?

Upvotes: 3

Views: 4122

Answers (2)

Dean
Dean

Reputation: 9128

Yong Wang's answer explains really well why I wasn't able to get the values I wanted. I was able to come up with a workaround however.

xcom_push and xcom_pull both just call class methods on XCom. You can call these directly. As it turns out, you can use a made up task id and it will save it to the xcom table under that id. Since it's not a real task, it won't be deleted when the task (nor DAG) is cleared.

from airflow.models import XCom

def set_xcom(context, value):
    XCom.set(key='my_key',
             value=value
             task_id='{}_SOME_SUFFIX'.format(context['ti'].task_id),
             dag_id=context['ti'].dag_id,
             execution_date=context['ti'].execution_date)

def get_xcom(context):
    return XCom.get_one(context['ti'].execution_date,
                        key='my_key',
                        task_id='{}_SOME_SUFFIX'.format(context['ti'].task_id),
                        dag_id=context['ti'].dag_id,
                        include_prior_dates=False)

It's not the standard way of using XCom, so I'll just have to be careful when upgrading to a new version of Airflow in the future.

Upvotes: 8

Yong Wang
Yong Wang

Reputation: 1313

In my understanding:

Xcom is designed to for inter-exchange message between tasks. And Xcom status is depends on the task instance. If the task instance is cleared(delted), the xcom history info belong to this instance will be deleted as well.

That's why you get

  • none when include_prior_dates=False, (task instance deleted, there is no such xcom record)

  • last dag info when include_prior_dates=True,(task instance delted, but another dags tasks instance xcome was poped(most recented).

Here is show case by default example_xcom:

    1. all_success status:
      • 1.1 dag enter image description here
      • 1.2 the xcom list enter image description here
    1. clear one task status:
      • 2.1 clear enter image description here
      • 2.2 the deletd xcom list the xcome should be deleted, but I failed to screen capture.

    Thats' why your program get none.

    • 2.3 the redo xcom list(but timestamp is different, the new record) enter image description here

if you find the answer is help, pls vote it up. Thanks

Upvotes: 6

Related Questions