Reputation: 21
I have a list of http endpoints each performing a task on its own. We are trying to write an application which will orchestrate by invoking these endpoints in a certain order. In this solution we also have to process the output of one http endpoint and generate the input for the next http enpoint. Also, the same workflow can get invoked simultaneously depending on the trigger.
What I have done until now, 1. Have defined a new operator deriving from the HttpOperator and introduced capabilities to write the output of the http endpoint to a file. 2. Have written a python operator which can transfer the output depending on the necessary logic.
Since I can have multiple instances of the same workflow in execution, I could not hardcode the output file names. Is there a way to make the http operator which I wrote to write to some unique file names and the same file name should be available for the next task so that it can read and process the output.
Upvotes: 2
Views: 3595
Reputation: 751
Airflow does have a feature for operator cross-communication called XCom
XComs can be “pushed” (sent) or “pulled” (received). When a task pushes an XCom, it makes it generally available to other tasks. Tasks can push XComs at any time by calling the xcom_push() method.
Tasks call xcom_pull() to retrieve XComs, optionally applying filters based on criteria like key, source task_ids, and source dag_id.
To push to XCOM use
ti.xcom_push(key=<variable name>, value=<variable value>)
To pull a XCOM object use
myxcom_val = ti.xcom_pull(key=<variable name>, task_ids='<task to pull from>')
With bash operator , you just set xcom_push = True
and the last line in stdout is set as xcom object.
You can view the xcom object , hwile your task is running by simply opening the tast execution from airflow UI and clicking on the xcom tab.
Upvotes: 2