Alisson Hayasi
Alisson Hayasi

Reputation: 384

Is there any way to pass multiple arguments to a python callable through XComArgs in Airflow?

Since version 2.0.0 of Apache Airflow, we can pass the output from one function as an input to another, easily through the decorator @task, as shown in the code below

from airflow import DAG
from airflow.decorators import task
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator

@task(multiple_outputs=True)
def values():
    return {
        "value_one": "I'm a value!",
        "value_two": "I'm another value!",
    }

@task
def show(value_one, value_two):
    print(value_one, value_two)

with DAG(
    dag_id="task_decorator",
    start_date=days_ago(1),
    schedule_interval=None,
) as dag:

    values = values()
    show(values["value_one"], values["value_two"])

A similar version of the above code, using only XComArgs (feature behind @task), can be done as follows

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator

def values():
    return {
        "value_one": "I'm a value!",
        "value_two": "I'm another value!",
    }

def show(value):
    print(value)

with DAG(
    dag_id="task_decorator",
    start_date=days_ago(1),
    schedule_interval=None,
) as dag:

    values_task = PythonOperator(
        task_id="values",
        python_callable=values,
    )
    show_task = PythonOperator(
        task_id="show",
        python_callable=show,
        op_args=[values_task.output],
    )
    values_task >> show_task

However, that way I can't pass the outputs as arguments separately as done using @task

So, does anyone know any workaround to allow something like the code bellow?

show_task = PythonOperator(
    task_id="show",
    python_callable=show,
    op_args=[values_task.output["value_one"], values_task.output["value_two"]],
)

Upvotes: 1

Views: 4281

Answers (1)

NicoE
NicoE

Reputation: 4853

The provided example does works if you use op_kwargs instead of op_args. That is related to the value being pushed to XCom from the return value of the values() function.

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator

def values():
    return {
        "value_one": "I'm a value!",
        "value_two": "I'm another value!",
    }


def show(value_one, value_two):
    print(value_one, value_two)


with DAG(
    dag_id="task_decorator",
    start_date=days_ago(1),
    schedule_interval=None,
) as dag:

    values_task = PythonOperator(
        task_id="values",
        python_callable=values
    )

    show_task = PythonOperator(
        task_id="show",
        python_callable=show,
        op_kwargs=values_task.output
    )
    values_task >> show_task

Logs output:

[2021-05-13 23:36:15,084] {logging_mixin.py:104} INFO - I'm a value! I'm another value!

If you return a list from the first task, then you could use op_args since they will get unpacked in your callable. I hope that works for you!

Upvotes: 1

Related Questions