traase
traase

Reputation: 33

Pass in the return value of same task to on_success_callback parameter

I am writing a PythonOperator task for an Airflow DAG and am using the on_success_callback option to write to an endpoint. I need to pass in the output of whatever was executed in the python callable to the on_success_callback parameter so that it can write the relevant information.

    test_task = PythonOperator(
        task_id="test_task",
        on_success_callback=callback_func(json=task_instance.xcom_pull['test_task']),
        python_callable=get_output,
        op_kwargs={"id": other_task.output['id']},
        provide_context=True
    )

I've tried a few things like context['ti'].xcom_pull but the DAG fails to even register. Any ideas on how to do this?

Upvotes: 1

Views: 591

Answers (1)

Saxtheowl
Saxtheowl

Reputation: 4658

on_success_callback is called without any parameters by Airflow, lets make it use parameters by wrapping your function in another function with provide_context=True.

def callback_func(**kwargs):
    ti = kwargs['ti']
    value = ti.xcom_pull(task_ids='test_task')
    print(f"Callback value: {value}")

def get_output(id, **kwargs):
    value = "Value from get_output function"
    kwargs['ti'].xcom_push(key='return_value', value=value)
    return value

test_task = PythonOperator(
    task_id="test_task",
    python_callable=get_output,
    op_kwargs={"id": other_task.output['id']},
    provide_context=True,
    on_success_callback=callback_func
)

Upvotes: 0

Related Questions