adan11
adan11

Reputation: 787

Airflow - How to handle Asynchronous API calls?

I'm trying to figure out how to best approach the below problem. Essentially I have an external API Service that I am sending requests to and getting results for.

POST = Send request and the response you get back is a URL which you can use for the GET requests to retrieve your results.

GET = Poll the returned URL from the POST request until you get a successful result.

What would be the best way to approach this in airflow? My idea is to essentially have 2 tasks running in parallel.

  1. One sending the POST requests and then saving the response URL to XCOM.
  2. The other would be continuously running in a while loop, reading from the XCOM store for new URL responses and getting responses. It would then delete from XCOM store once it has a retrieved a succesfuly result from that URL.

Do you think this is the correct way of going about it? Or possibly should i use the asyncio library in python?

Any help much appreciated

Thanks,

Upvotes: 5

Views: 4148

Answers (1)

NicoE
NicoE

Reputation: 4873

You can achieve what you are describing using SimpleHttpOperator and HttpSensor from Airflow (no need to install any extra package).

Consider this example that uses http_default connection to http bin.

The task to perform POST request:

task_post_op = SimpleHttpOperator(
    task_id='post_op',
    # http_conn_id='your_conn_id',
    endpoint='post',
    data=json.dumps({"priority": 5}),
    headers={"Content-Type": "application/json"},
    response_check=lambda response: response.json()['json']['priority'] == 5,
    response_filter=lambda response: 'get', # e.g  lambda response: json.loads(response.text)
    dag=dag, 
)

By providing response_filter you can manipulate the response result, which will be the value pushed to XCom. In your case, you should return the endpoint you want to poll in the next task.

response_filter: A function allowing you to manipulate the response text. e.g response_filter=lambda response: json.loads(response.text). The callable takes the response object as the first positional argument and optionally any number of keyword arguments available in the context dictionary. :type response_filter: A lambda or defined function.

Note that response_check param it's optional.

The task to perform GET requests:

Use the HttpSensor to poke until the response_check callable evaluates to true.

task_http_sensor_check = HttpSensor(
    task_id='http_sensor_check',
    # http_conn_id='your_conn_id',
    endpoint=task_post_op.output, 
    request_params={},
    response_check=lambda response: "httpbin" in response.text,
    poke_interval=5,
    dag=dag,
)

As endpoint param we are passing the XCom value pulled from previous task, using XComArg. Use poke_interval to define the time in seconds that the job should wait in between each tries.

Remember to create a Connection of your own defining the base URL, port, etc.

Let me know if that worked for you!

Upvotes: 3

Related Questions