hmad
hmad

Reputation: 186

issue with passing return value from a task as an argument to another task

i've a task that returns a tuple. passing one element of that tuple to another task is not working. i can pass the entire tuple, but not an element from the return value:

    from airflow.decorators import dag, task
    from pendulum import datetime
    
    
    @task
    def create():
        return 1, 2
    
    
    @task
    def consume(one):
        print('arg is', one)
    
    
    @dag(
        schedule_interval='@once',
        start_date=datetime(2022, 4, 10),
    )
    def test_dag():
        out = create()
        consume(out[0])  # does not work: the task gets None as argument
        consume(out)     # this works
    
    dag = test_dag()

Upvotes: 4

Views: 5082

Answers (2)

mochatiger
mochatiger

Reputation: 324

In case you don't want to return a dict, you can instead pass the create task's result directly to the consume task.

from airflow.decorators import dag, task
from pendulum import datetime
    
@dag(
  schedule_interval='@once',
  start_date=datetime(2022, 4, 10),
)
def test_dag():
  @task
  def create():
    # log: INFO - Done. Returned value was: (1, 2)
    return 1, 2
    
  @task
  def consume(my_tuple):
    # log: INFO - 1
    print(my_tuple[0]) 

  consume(create())


dag = test_dag()

The @tasks work when defined in the dag like the above, or outside it as in your original example.

Upvotes: 0

Josh Fell
Josh Fell

Reputation: 3589

Within TaskFlow the object returned from a TaskFlow function is actually an XComArg. These XComArgs are abstractions over the classic task_instance.xcom_pull(...) retrieval of XComs. Additionally XComArg objects implement __getitem__ for specifying an XCom key other than "return_value" (which is the default).

So what's going on in the case of using consume(out[0]) is that Airflow is leveraging an XComArg object to retrieve an XCom with a key of 0 not retrieving the output from create() and then the first item. What's going on behind the scenes is task_instance.xcom_pull(task_ids="create", key=0).

Yes, this is unexpected in a way and it's not quite inline with the classic xcom_pull() approach. This issue has been opened to try and achieve feature parity.

In the meantime, you can of course access the whole XComArg like you show by just using consume(out) or you can update the TaskFlow function to return a dictionary and use multiple_outputs to have each key/value pair serialized as their own XComs.

For example:

from pendulum import datetime

from airflow.decorators import dag, task


@task(multiple_outputs=True)
def create():
    return {"one": 1, "two": 2}


@task
def consume(arg):
    print('arg is', arg)


@dag(
    schedule_interval='@once',
    start_date=datetime(2022, 4, 10),
)
def test_dag():
    out = create()
    consume(out["one"])

dag = test_dag()

Separate XComs created from the create task: Separate XComs created from the create task

consume task log: enter image description here

Side note: multiple_outputs can also be inferred if the TaskFlow function has a dictionary return type annotation too. This will set multiple_outputs=True based on the return annotation:

from typing import Dict

@task
def create() -> Dict[str, int]:
    return {"one": 1, "two": 2}

Upvotes: 11

Related Questions