dragoon
dragoon

Reputation: 5743

Correct way to parallelize work with asyncio

There are many posts on SO asking specific questions about asyncio, but I cannot grasp the right way on what to use for a given situation.

Let's say I want to parse and crawl a number of web pages in parallel. I can do this in at least 3 different ways with asyncio:

with pool.submit:

with ThreadPoolExecutor(max_workers=10) as pool:
    result_futures = list(map(lambda x: pool.submit(my_func, x), my_list))
    for future in as_completed(result_futures):
        results.append(future.result())
return results

With asyncio.gather:

loop = asyncio.get_running_loop()
with ThreadPoolExecutor(max_workers=10) as pool:
     futures = [loop.run_in_executor(pool, my_func, x) for x in my_list]
     results = await asyncio.gather(*futures)

With just pool.map:

with ThreadPoolExecutor(max_workers=10) as pool:
     results = [x for x in pool.map(my_func, arg_list)]

my_func is something like

async def my_func(arg):
    async with aiohttp.ClientSession() as session:
        async with session.post(...):
            ...

Could somebody help me understand what would be the differences between those 3 approaches? I understand that I can, for example, handle exceptions independently in the first one, but any other differences?

Upvotes: 1

Views: 1346

Answers (1)

jsbueno
jsbueno

Reputation: 110263

None of these. ThreadPoolExecutor and run_in_executor will all execute your code in another thread, no matter you use the asyncio loop to watch for their execution. And at that point you might just as well not use asyncio at all: the idea of async is exactly managing to run everything on a single thread - getting some CPU cycles and easing a lot on race-conditions that emerge on multi-threaded code.

If your my_func is using async correctly, all the way (it looks like it is, but the code is incomplete), you have to create an asyncio Task for each call to your "async defined" function. On that, maybe the shortest path is indeed using asyncio.gather:

import asyncio
import aiohttp, ... # things used inside "my_func"

def my_func(x):
    ...

my_list = ...

results = asyncio.run(asyncio.gather(*(my_func(x) for x in my_list)))

An that is all there is for it.

Now going back to your code, and checking the differences: your code work almost by chance, as in, you really just passed the async functiona and its parameters to the threadpool executor: on calling any async function in this way, they return imediatelly, with no work done. That means nothing (but some thin boiler plate inner code used to create the co-routines) is executed in your threadpool executors. The values returned by the call that runs in the target threads (i.e. the actual my_func(x) call) are the "co-routines": these are the objects that are to be awaited in the main thread and that will actually performe the network I/O. That is: your "my_func" is a "co-routine function" and when called it retoruns immediately with a "co-routine object". When the co-routine object is awaited the code inside "my_func" is actually executed.

Now, with that out of the way: in your first snippet you call future.result on the concurrent.futures Future: that will jsut give you the co-routine object: that code does not work - if you would write results.append(await future.result()) then, yes, if there are no exceptions in the execution, it would work, but would make all the calls in sequence: "await" stops the execution of the current thread until the awaited object resolves, and since awaiting for the other results would happen in this same code, they will queue and be executed in order, with zero parallelism.

Your pool.map code does the same, and your asyncio.gather code is wrong in a different way: the loop.run_in_executor code will take your call and run it on another thread, and gives you an awaitable object which is suitable to be used with gather. However, awaiting on it will return you the "co-routine object", not the result of the HTTP call.

Your real options regarding getting the exceptions raised in the parallel code are either using asyncio.gather, asyncio.wait or asyncio.as_completed. Check the docs here: https://docs.python.org/3/library/asyncio-task.html

Upvotes: 2

Related Questions