Reputation: 384
Since version 2.0.0 of Apache Airflow, we can pass the output from one function as an input to another, easily through the decorator @task
, as shown in the code below
from airflow import DAG
from airflow.decorators import task
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator
@task(multiple_outputs=True)
def values():
return {
"value_one": "I'm a value!",
"value_two": "I'm another value!",
}
@task
def show(value_one, value_two):
print(value_one, value_two)
with DAG(
dag_id="task_decorator",
start_date=days_ago(1),
schedule_interval=None,
) as dag:
values = values()
show(values["value_one"], values["value_two"])
A similar version of the above code, using only XComArgs (feature behind @task
), can be done as follows
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator
def values():
return {
"value_one": "I'm a value!",
"value_two": "I'm another value!",
}
def show(value):
print(value)
with DAG(
dag_id="task_decorator",
start_date=days_ago(1),
schedule_interval=None,
) as dag:
values_task = PythonOperator(
task_id="values",
python_callable=values,
)
show_task = PythonOperator(
task_id="show",
python_callable=show,
op_args=[values_task.output],
)
values_task >> show_task
However, that way I can't pass the outputs as arguments separately as done using @task
So, does anyone know any workaround to allow something like the code bellow?
show_task = PythonOperator(
task_id="show",
python_callable=show,
op_args=[values_task.output["value_one"], values_task.output["value_two"]],
)
Upvotes: 1
Views: 4281
Reputation: 4853
The provided example does works if you use op_kwargs
instead of op_args
. That is related to the value being pushed to XCom
from the return value of the values()
function.
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator
def values():
return {
"value_one": "I'm a value!",
"value_two": "I'm another value!",
}
def show(value_one, value_two):
print(value_one, value_two)
with DAG(
dag_id="task_decorator",
start_date=days_ago(1),
schedule_interval=None,
) as dag:
values_task = PythonOperator(
task_id="values",
python_callable=values
)
show_task = PythonOperator(
task_id="show",
python_callable=show,
op_kwargs=values_task.output
)
values_task >> show_task
Logs output:
[2021-05-13 23:36:15,084] {logging_mixin.py:104} INFO - I'm a value! I'm another value!
If you return a list
from the first task, then you could use op_args
since they will get unpacked in your callable. I hope that works for you!
Upvotes: 1