Zat42
Zat42

Reputation: 2911

How to pull XCOM value from on_failure_callback

When a task fails, is it possible to pull a XCOM value that has been previously set in an other task during the on_failure_callback execution?

To be more specific, exemple:

dag: task1 >> task2

Is this possible to retrieve the value of key test in the on_failure_callback ?

I tried like this, but it seems it didn't find any value:

# Daf configuration
    ...
    "on_failure_callback": deploy_failure,
    ...

# In task1
    kwargs["ti"].xcom_push(key="test", value=123)

# on_failure_callback method
def deploy_failure(context):
    print("/!\ Deploy failure callback triggered...")
    test_value = context.get("ti").xcom_pull(key="test")
    print(test_value)

test_value is None

I am sure the Xcom value is set because I can see it on the Airflow backend.

Any idea?

Upvotes: 0

Views: 2253

Answers (2)

Madhawa Manchanayake
Madhawa Manchanayake

Reputation: 31

I was able to pull the XCOM value directly using the following code. It seems like whatever issue you were facing got fixed.

def task_failure_alert(context):
     ti = context['ti']

     test_value = ti.xcom_pull(key="test")

Upvotes: 1

Philipp Johannis
Philipp Johannis

Reputation: 2936

I guess there is some issue with provide_context in the failure_callback. You can work around this by accessing directly XCom class:

from airflow.models import XCom

def deploy_failure(context):
    print("/!\ Deploy failure callback triggered...")
    test_value = XCom.get_one(execution_date = context.get('execution_date'), key='test')
    print("ALERT: {0}".format(test_value))

Upvotes: 3

Related Questions