Brad Solomon
Brad Solomon

Reputation: 40878

Converting small functions to coroutines

I feel like there is a gap in my understanding of async IO: is there a benefit to wrapping small functions into coroutines, within the scope of larger coroutines? Is there a benefit to this in signaling the event loop correctly? Does the extent of this benefit depend on whether the wrapped function is IO or CPU-bound?

Example: I have a coroutine, download(), which:

  1. Downloads JSON-serialized bytes from an HTTP endpoint via aiohttp.
  2. Compresses those bytes via bz2.compress() - which is not in itself awaitable
  3. Writes the compressed bytes to S3 via aioboto3

So parts 1 & 3 use predefined coroutines from those libraries; part 2 does not, by default.

Dumbed-down example:

import bz2
import io
import aiohttp
import aioboto3

async def download(endpoint, bucket_name, key):
    async with aiohttp.ClientSession() as session:
        async with session.request("GET", endpoint, raise_for_status=True) as resp:
            raw = await resp.read()  # payload (bytes)
            # Yikes - isn't it bad to throw a synchronous call into the middle
            # of a coroutine?
            comp = bz2.compress(raw)
            async with (
                aioboto3.session.Session()
                .resource('s3')
                .Bucket(bucket_name)
            ) as bucket:
                await bucket.upload_fileobj(io.BytesIO(comp), key)

As hinted by the comment above, my understanding has always been that throwing a synchronous function like bz2.compress() into a coroutine can mess with it. (Even if bz2.compress() is probably more IO-bound than CPU-bound.)

So, is there generally any benefit to this type of boilerplate?

async def compress(*args, **kwargs):
    return bz2.compress(*args, **kwargs)

(And now comp = await compress(raw) within download().)

Wa-la, this is now an awaitable coroutine, because a sole return is valid in a native coroutine. Is there a case to be made for using this?

Per this answer, I've heard justification for randomly throwing in asyncio.sleep(0) in a similar manner - just to single back up to the event loop that the calling coroutine wants a break. Is this right?

Upvotes: 2

Views: 1918

Answers (2)

9000
9000

Reputation: 40884

Coroutines allow you to run something concurrently, not in parallel. They allow for a single-threaded cooperative multitasking. This makes sense in two cases:

  • You need to produce results in lockstep, like two generators would.
  • You want something useful be done while another coroutine is waiting for I/O.

Things like http requests or disk I/O would allow other coroutines to run while they are waiting for completion of an operation.

bz2.compress() is synchronous and, I suppose, does not release GIL but does release GIL while it is running. This means that no meaningful work can be done while it's running. That is, other coroutines would not run during its invocation, though other threads would.

If you anticipate a large amount of data to compress, so large that the overhead of running a coroutine is small in comparison, you can use bz2.BZ2Compressor and feed it with data in reasonably small blocks (like 128KB), write the result to a stream (S3 supports streaming, or you can use StringIO), and await asyncio.sleep(0) between compressing blocks to yield control.

This will allow other coroutines to also run concurrently with your compression coroutine. Possibly async S3 upload will be occurring in parallel at the socket level, too, while your coroutine would be inactive.

BTW making your compressor explicitly an async generator can be a simpler way to express the same idea.

Upvotes: 2

user4815162342
user4815162342

Reputation: 154886

So, is there generally any benefit to this type of boilerplate?

async def compress(*args, **kwargs):
    return bz2.compress(*args, **kwargs)

There is no benefit to it whatsoever. Contrary to expectations, adding an await doesn't guarantee that the control will be passed to the event loop - that will happen only if the awaited coroutine actually suspends. Since compress doesn't await anything, it will never suspend, so it's a coroutine in name only.

Note that adding await asyncio.sleep(0) in coroutines does not solve the problem; see this answer for a more detailed discussion. If you need to run a blocking function, use run_in_executor:

async def compress(*args, **kwargs):
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(None, lambda: bz2.compress(*args, **kwargs))

Upvotes: 5

Related Questions