mad_
mad_

Reputation: 8273

How to retrieve nested output from XCom using taskflow syntax in Airflow

Well, I know this seems to be possible I just don't know how. To begin with, I am using traditional operators (without @task decorator) but I am interested in XComArgs return output format from these operators that can be used in downstream tasks. Below is a sample example

task_1 = DummyOperator(
    task_id = 'task_1'
) # returns {"data": {"foo" : [{"cmd": "ls"}]}}


task_2 = BashOperator(
    task_2='task_2',
    cmd=task_1.output['return_value']['data']['foo'][0]['cmd'] # does not give what I need and returns null. 

    #cmd = f"{{ ti.xcom_pull(task_ids = 'task_1', key='return_value')['data']['foo'][0]['cmd'] }}" Gives what I need
)

In this example what is working for me which is pure Jinja templating and the new syntax does not work for me using XComArgs. I have tried changing the argument render_template_as_native_obj=True in Dag configuration but does not change anything. I want to use .output format which returns XcomArgs object and is returning the complete dict but have not been able to use the nested keys like above. Also, have tried converting string to JSON and all those combinations but does not seem to work.

Upvotes: 2

Views: 1785

Answers (1)

Josh Fell
Josh Fell

Reputation: 3629

Unfortunately, retrieving nested values from XComArgs in a limitation of the TaskFlow API.

The TaskFlow API uses __getitem__ to override the XCom key to use. In your example, the key ends up being "cmd" rather than the value of what cmd represents in that nested object. You'll have to use the original ti.xcom_pull() method until that limitation is addressed.

Upvotes: 4

Related Questions