Reputation: 33
I am writing a PythonOperator task for an Airflow DAG and am using the on_success_callback option to write to an endpoint. I need to pass in the output of whatever was executed in the python callable to the on_success_callback parameter so that it can write the relevant information.
test_task = PythonOperator(
task_id="test_task",
on_success_callback=callback_func(json=task_instance.xcom_pull['test_task']),
python_callable=get_output,
op_kwargs={"id": other_task.output['id']},
provide_context=True
)
I've tried a few things like context['ti'].xcom_pull but the DAG fails to even register. Any ideas on how to do this?
Upvotes: 1
Views: 591
Reputation: 4658
on_success_callback
is called without any parameters
by Airflow
, lets make it use parameters
by wrapping your function in another function with provide_context=True
.
def callback_func(**kwargs):
ti = kwargs['ti']
value = ti.xcom_pull(task_ids='test_task')
print(f"Callback value: {value}")
def get_output(id, **kwargs):
value = "Value from get_output function"
kwargs['ti'].xcom_push(key='return_value', value=value)
return value
test_task = PythonOperator(
task_id="test_task",
python_callable=get_output,
op_kwargs={"id": other_task.output['id']},
provide_context=True,
on_success_callback=callback_func
)
Upvotes: 0