Reputation: 787
I'm trying to figure out how to best approach the below problem. Essentially I have an external API Service that I am sending requests to and getting results for.
POST = Send request and the response you get back is a URL which you can use for the GET requests to retrieve your results.
GET = Poll the returned URL from the POST request until you get a successful result.
What would be the best way to approach this in airflow? My idea is to essentially have 2 tasks running in parallel.
Do you think this is the correct way of going about it? Or possibly should i use the asyncio library in python?
Any help much appreciated
Thanks,
Upvotes: 5
Views: 4148
Reputation: 4873
You can achieve what you are describing using SimpleHttpOperator
and HttpSensor
from Airflow (no need to install any extra package).
Consider this example that uses http_default connection to http bin.
The task to perform POST request:
task_post_op = SimpleHttpOperator(
task_id='post_op',
# http_conn_id='your_conn_id',
endpoint='post',
data=json.dumps({"priority": 5}),
headers={"Content-Type": "application/json"},
response_check=lambda response: response.json()['json']['priority'] == 5,
response_filter=lambda response: 'get', # e.g lambda response: json.loads(response.text)
dag=dag,
)
By providing response_filter
you can manipulate the response result, which will be the value pushed to XCom
. In your case, you should return the endpoint you want to poll in the next task.
response_filter: A function allowing you to manipulate the response text. e.g response_filter=lambda response: json.loads(response.text). The callable takes the response object as the first positional argument and optionally any number of keyword arguments available in the context dictionary. :type response_filter: A lambda or defined function.
Note that response_check param it's optional.
The task to perform GET requests:
Use the HttpSensor to poke until the response_check
callable evaluates to true.
task_http_sensor_check = HttpSensor(
task_id='http_sensor_check',
# http_conn_id='your_conn_id',
endpoint=task_post_op.output,
request_params={},
response_check=lambda response: "httpbin" in response.text,
poke_interval=5,
dag=dag,
)
As endpoint
param we are passing the XCom value pulled from previous task, using XComArg.
Use poke_interval
to define the time in seconds that the job should wait in between each tries.
Remember to create a Connection of your own defining the base URL, port, etc.
Let me know if that worked for you!
Upvotes: 3