Mikhail T.
Mikhail T.

Reputation: 3997

asyncio only processes one file at a time

I'm working a program to upload (large) files to a remote SFTP-server, while also calculating the file's SHA256. The uploads are slow, and the program is supposed to open multiple SFTP-connections.

Here is the main code:

async def bwrite(fd, buf):
    log.debug('Writing %d bytes to %s', len(buf), fd)
    fd.write(buf)

async def digester(digest, buf):
    log.debug('Updating digest %s with %d more bytes', digest, len(buf))
    digest.update(buf)

async def upload(fNames, SFTP, rename):

    for fName in fNames:
        inp = open(fName, "rb", 655360)
        log.info('Opened local %s', fName)
        digest = hashlib.sha256()
        rName = rename % os.path.splitext(fName)[0]
        out = SFTP.open(rName, "w", 655360)
...
        while True:
            buf = inp.read(bsize)
            if not buf:
                break
            await bwrite(out, buf)
            await digester(digest, buf)

        inp.close()
        out.close()
...

for i in range(0, len(clients)):
    fNames = args[(i * chunk):((i + 1) * chunk)]
    log.debug('Connection %s: %d files: %s',
        clients[i], len(fNames), fNames)
    uploads.append(upload(fNames, clients[i], Rename))

log.info('%d uploads initiated, awaiting completion', len(uploads))
results = asyncio.gather(*uploads)
loop = asyncio.get_event_loop()
loop.run_until_complete(results)
loop.close()

The idea is for multiple upload coroutines to run "in parallel" -- each using its own separate SFTP-connection -- pushing out one or more files to the server.

It even works -- but only a single upload is running at any time. I expected multiple ones to get control -- while their siblings awaits the bwrite and/or the digester. What am I doing wrong?

(Using Python-3.6 on FreeBSD-11, if that matters... The program is supposed to run on RHEL7 eventually...)

Upvotes: 0

Views: 303

Answers (1)

ShadowRanger
ShadowRanger

Reputation: 155438

If no awaiting is involved, then no parallelism can occur. bwrite and digester, while declared async, perform no async operations (not launching or creating coroutines that can be awaited); if you removed the async in their name and removed the await where they're called, the code would behave identically.

The only time asyncio can get you benefits is when:

  1. There is a blocking operation involved, and
  2. Said blocking operation is designed for asyncio use (or involves a file descriptor that can be rewrapped for said purposes)

Your bwrite isn't doing that (it's doing normal blocking I/O on SFTP objects, which doesn't appear to be async-friendly; if it was async, your failure to either return the future it produced, or await it yourself, would usually mean it does nothing, barring the off-chance it self-scheduled a task internally), your reads from the input file aren't either (which is fine; normally, changing blocking I/O to asyncio for local file access isn't beneficial; it's all being buffered, at user level and kernel level, so you almost never block on the writes). Nor is your digester (hashing operations are CPU bound; it never makes sense to make them async unless they're being done with actual async stuff).

Since the two awaits in upload are effectively synchronous (they don't return anything that, when awaited, would actually block on actual asynchronous tasks), upload itself is effectively synchronous (it will never, under any circumstances, return control to the event loop before it completes). So even though all the other tasks are in the event loop queue, raring to go, the event loop itself has to wait until the running task blocks in an async-friendly way (with await on something that actually does background work while blocked), which never happens, and the tasks just get run sequentially, one after the other.

If an async-friendly version of your SFTP module exists, that might allow you to gain some benefit. But without it, you're probably better off using concurrent.futures.ThreadPoolExecutor or multiprocessing.pool.ThreadPool to do preemptive multitasking (which will swap out threads whenever they release the GIL, forcibly swapping between bytecodes if they don't release the GIL for awhile). That will get you parallelism on any blocking I/O (async-friendly or not), and, if the data is large enough, on the hashing work as well (hashlib is one of the only Python built-ins I know of that releases the GIL for CPU-bound work, if the data to be hashed is large enough; extension modules releasing the GIL is the only way multithreaded CPython can do more than one core's worth of CPU-bound work in a single process, even on multicore systems).

Upvotes: 1

Related Questions