Reputation: 3997
I'm working a program to upload (large) files to a remote SFTP-server, while also calculating the file's SHA256. The uploads are slow, and the program is supposed to open multiple SFTP-connections.
Here is the main code:
async def bwrite(fd, buf):
log.debug('Writing %d bytes to %s', len(buf), fd)
fd.write(buf)
async def digester(digest, buf):
log.debug('Updating digest %s with %d more bytes', digest, len(buf))
digest.update(buf)
async def upload(fNames, SFTP, rename):
for fName in fNames:
inp = open(fName, "rb", 655360)
log.info('Opened local %s', fName)
digest = hashlib.sha256()
rName = rename % os.path.splitext(fName)[0]
out = SFTP.open(rName, "w", 655360)
...
while True:
buf = inp.read(bsize)
if not buf:
break
await bwrite(out, buf)
await digester(digest, buf)
inp.close()
out.close()
...
for i in range(0, len(clients)):
fNames = args[(i * chunk):((i + 1) * chunk)]
log.debug('Connection %s: %d files: %s',
clients[i], len(fNames), fNames)
uploads.append(upload(fNames, clients[i], Rename))
log.info('%d uploads initiated, awaiting completion', len(uploads))
results = asyncio.gather(*uploads)
loop = asyncio.get_event_loop()
loop.run_until_complete(results)
loop.close()
The idea is for multiple upload
coroutines to run "in parallel" -- each using its own separate SFTP-connection -- pushing out one or more files to the server.
It even works -- but only a single upload
is running at any time. I expected multiple ones to get control -- while their siblings await
s the bwrite
and/or the digester
. What am I doing wrong?
(Using Python-3.6 on FreeBSD-11, if that matters... The program is supposed to run on RHEL7 eventually...)
Upvotes: 0
Views: 303
Reputation: 155438
If no awaiting is involved, then no parallelism can occur. bwrite
and digester
, while declared async
, perform no async operations (not launching or creating coroutines that can be awaited); if you removed the async
in their name and removed the await
where they're called, the code would behave identically.
The only time asyncio
can get you benefits is when:
asyncio
use (or involves a file descriptor that can be rewrapped for said purposes)Your bwrite
isn't doing that (it's doing normal blocking I/O on SFTP
objects, which doesn't appear to be async
-friendly; if it was async, your failure to either return the future it produced, or await
it yourself, would usually mean it does nothing, barring the off-chance it self-scheduled a task internally), your reads from the input file aren't either (which is fine; normally, changing blocking I/O to asyncio for local file access isn't beneficial; it's all being buffered, at user level and kernel level, so you almost never block on the writes). Nor is your digester
(hashing operations are CPU bound; it never makes sense to make them async
unless they're being done with actual async stuff).
Since the two await
s in upload
are effectively synchronous (they don't return anything that, when await
ed, would actually block on actual asynchronous tasks), upload
itself is effectively synchronous (it will never, under any circumstances, return control to the event loop before it completes). So even though all the other tasks are in the event loop queue, raring to go, the event loop itself has to wait until the running task blocks in an async-friendly way (with await
on something that actually does background work while blocked), which never happens, and the tasks just get run sequentially, one after the other.
If an async-friendly version of your SFTP
module exists, that might allow you to gain some benefit. But without it, you're probably better off using concurrent.futures.ThreadPoolExecutor
or multiprocessing.pool.ThreadPool
to do preemptive multitasking (which will swap out threads whenever they release the GIL, forcibly swapping between bytecodes if they don't release the GIL for awhile). That will get you parallelism on any blocking I/O (async-friendly or not), and, if the data is large enough, on the hashing work as well (hashlib
is one of the only Python built-ins I know of that releases the GIL for CPU-bound work, if the data to be hashed is large enough; extension modules releasing the GIL is the only way multithreaded CPython can do more than one core's worth of CPU-bound work in a single process, even on multicore systems).
Upvotes: 1