Reputation: 2911
When a task fails, is it possible to pull a XCOM value that has been previously set in an other task during the on_failure_callback execution?
To be more specific, exemple:
dag: task1 >> task2
key="test"
value=123
in XcomIs this possible to retrieve the value of key test
in the on_failure_callback ?
I tried like this, but it seems it didn't find any value:
# Daf configuration
...
"on_failure_callback": deploy_failure,
...
# In task1
kwargs["ti"].xcom_push(key="test", value=123)
# on_failure_callback method
def deploy_failure(context):
print("/!\ Deploy failure callback triggered...")
test_value = context.get("ti").xcom_pull(key="test")
print(test_value)
test_value is None
I am sure the Xcom value is set because I can see it on the Airflow backend.
Any idea?
Upvotes: 0
Views: 2253
Reputation: 31
I was able to pull the XCOM value directly using the following code. It seems like whatever issue you were facing got fixed.
def task_failure_alert(context):
ti = context['ti']
test_value = ti.xcom_pull(key="test")
Upvotes: 1
Reputation: 2936
I guess there is some issue with provide_context
in the failure_callback. You can work around this by accessing directly XCom class:
from airflow.models import XCom
def deploy_failure(context):
print("/!\ Deploy failure callback triggered...")
test_value = XCom.get_one(execution_date = context.get('execution_date'), key='test')
print("ALERT: {0}".format(test_value))
Upvotes: 3