Reputation: 186
i've a task that returns a tuple. passing one element of that tuple to another task is not working. i can pass the entire tuple, but not an element from the return value:
from airflow.decorators import dag, task
from pendulum import datetime
@task
def create():
return 1, 2
@task
def consume(one):
print('arg is', one)
@dag(
schedule_interval='@once',
start_date=datetime(2022, 4, 10),
)
def test_dag():
out = create()
consume(out[0]) # does not work: the task gets None as argument
consume(out) # this works
dag = test_dag()
Upvotes: 4
Views: 5082
Reputation: 324
In case you don't want to return a dict, you can instead pass the create
task's result directly to the consume
task.
from airflow.decorators import dag, task
from pendulum import datetime
@dag(
schedule_interval='@once',
start_date=datetime(2022, 4, 10),
)
def test_dag():
@task
def create():
# log: INFO - Done. Returned value was: (1, 2)
return 1, 2
@task
def consume(my_tuple):
# log: INFO - 1
print(my_tuple[0])
consume(create())
dag = test_dag()
The @task
s work when defined in the dag like the above, or outside it as in your original example.
Upvotes: 0
Reputation: 3589
Within TaskFlow the object returned from a TaskFlow function is actually an XComArg
. These XComArgs
are abstractions over the classic task_instance.xcom_pull(...)
retrieval of XComs
. Additionally XComArg
objects implement __getitem__
for specifying an XCom
key other than "return_value" (which is the default).
So what's going on in the case of using consume(out[0])
is that Airflow is leveraging an XComArg
object to retrieve an XCom
with a key of 0 not retrieving the output from create()
and then the first item. What's going on behind the scenes is task_instance.xcom_pull(task_ids="create", key=0)
.
Yes, this is unexpected in a way and it's not quite inline with the classic xcom_pull()
approach. This issue has been opened to try and achieve feature parity.
In the meantime, you can of course access the whole XComArg
like you show by just using consume(out)
or you can update the TaskFlow function to return a dictionary and use multiple_outputs
to have each key/value pair serialized as their own XComs
.
For example:
from pendulum import datetime
from airflow.decorators import dag, task
@task(multiple_outputs=True)
def create():
return {"one": 1, "two": 2}
@task
def consume(arg):
print('arg is', arg)
@dag(
schedule_interval='@once',
start_date=datetime(2022, 4, 10),
)
def test_dag():
out = create()
consume(out["one"])
dag = test_dag()
Separate XComs created from the create
task:
Side note: multiple_outputs
can also be inferred if the TaskFlow function has a dictionary return type annotation too. This will set multiple_outputs=True
based on the return annotation:
from typing import Dict
@task
def create() -> Dict[str, int]:
return {"one": 1, "two": 2}
Upvotes: 11