Reputation: 13088
I'm trying to write two tasks that have no knowledge of the other. One task returns a dict (via XComArg
) and I want to pass a single property of that object to the next task. If I pass the entire XComArg
object, its value is populated as expected. But selecting a single property results in a None
.
@dag(...):
def _dag():
@task
def A(**ctx):
# ...
return {'a': 42, 'b': 'B', 'c': 'C'}
@task
def B(a, _res, **ctx):
print('A', a) # >>> A None
print('RES', _res) # >>> RES {'a': 42, ...}
res = A()
B(res['a'])
dag = _dag
Ideally, B
doesn't know where the value for a
comes from, nor how to get it. Yes, passing all of res
and having B
extract what it needs with res['a']
works, but my goal is loose coupling.
Upvotes: 1
Views: 630
Reputation: 20097
See example in https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html
You need to specify 'multiple_outputs=true" in task A
Upvotes: 2