rodrigo-silveira
rodrigo-silveira

Reputation: 13088

Airflow 2 loosely coupling @task return values to receiving @task?

I'm trying to write two tasks that have no knowledge of the other. One task returns a dict (via XComArg) and I want to pass a single property of that object to the next task. If I pass the entire XComArg object, its value is populated as expected. But selecting a single property results in a None.

@dag(...):
def _dag():

   @task
   def A(**ctx):
      # ...
      return {'a': 42, 'b': 'B', 'c': 'C'}

   @task
   def B(a, _res, **ctx):
      print('A', a) # >>> A None
      print('RES', _res) # >>> RES {'a': 42, ...}

   res = A()
   B(res['a'])

dag = _dag

Ideally, B doesn't know where the value for a comes from, nor how to get it. Yes, passing all of res and having B extract what it needs with res['a'] works, but my goal is loose coupling.

Upvotes: 1

Views: 630

Answers (1)

Jarek Potiuk
Jarek Potiuk

Reputation: 20097

See example in https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html

You need to specify 'multiple_outputs=true" in task A

Upvotes: 2

Related Questions