Ugur
Ugur

Reputation: 2024

Wrapping synchronous requests into asyncio (async/await)?

I am writing a tool in Python 3.6 that sends requests to several APIs (with various endpoints) and collects their responses to parse and save them in a database.

The API clients that I use have a synchronous version of requesting a URL, for instance they use

urllib.request.Request('...

Or they use Kenneth Reitz' Requests library.

Since my API calls rely on synchronous versions of requesting a URL, the whole process takes several minutes to complete.

Now I'd like to wrap my API calls in async/await (asyncio). I'm using python 3.6.

All the examples / tutorials that I found want me to change the synchronous URL calls / requests to an async version of it (for instance aiohttp). Since my code relies on API clients that I haven't written (and I can't change) I need to leave that code untouched.

So is there a way to wrap my synchronous requests (blocking code) in async/await to make them run in an event loop?

I'm new to asyncio in Python. This would be a no-brainer in NodeJS. But I can't wrap my head around this in Python.

Update 2023-06-12

Here's how I'd do it in Python 3.9+

import asyncio
import requests

async def main():
    response1 = await asyncio.to_thread(requests.get, 'http://httpbin.org/get')
    response2 = await asyncio.to_thread(requests.get, 'https://api.github.com/events')
    print(response1.text)
    print(response2.text)

asyncio.run(main())

Upvotes: 18

Views: 8398

Answers (1)

Claude
Claude

Reputation: 9980

The solution is to wrap your synchronous code in the thread and run it that way. I used that exact system to make my asyncio code run boto3 (note: remove inline type-hints if running < python3.6):

async def get(self, key: str) -> bytes:
    s3 = boto3.client("s3")
    loop = asyncio.get_event_loop()
    try:
        response: typing.Mapping = \
            await loop.run_in_executor(  # type: ignore
                None, functools.partial(
                    s3.get_object,
                    Bucket=self.bucket_name,
                    Key=key))
    except botocore.exceptions.ClientError as e:
        if e.response["Error"]["Code"] == "NoSuchKey":
            raise base.KeyNotFoundException(self, key) from e
        elif e.response["Error"]["Code"] == "AccessDenied":
            raise base.AccessDeniedException(self, key) from e
        else:
            raise
    return response["Body"].read()

Note that this will work because the vast amount of time in the s3.get_object() code is spent in waiting for I/O, and (generally) while waiting for I/O python releases the GIL (the GIL is the reason that generally threads in python is not a good idea).

The first argument None in run_in_executor means that we run in the default executor. This is a threadpool executor, but it may make things more explicit to explicitly assign a threadpool executor there.

Note that, where using pure async I/O you could easily have thousands of connections open concurrently, using a threadpool executor means that each concurrent call to the API needs a separate thread. Once you run out of threads in your pool, the threadpool will not schedule your new call until a thread becomes available. You can obviously raise the number of threads, but this will eat up memory; don't expect to be able to go over a couple of thousand.

Also see the python ThreadPoolExecutor docs for an explanation and some slightly different code on how to wrap your sync call in async code.

Upvotes: 14

Related Questions