Reputation: 85913
Is it possible to take a blocking function such as work
and have it run concurrently in a ProcessPoolExecutor
that has more than one worker?
import asyncio
from time import sleep, time
from concurrent.futures import ProcessPoolExecutor
num_jobs = 4
queue = asyncio.Queue()
executor = ProcessPoolExecutor(max_workers=num_jobs)
loop = asyncio.get_event_loop()
def work():
sleep(1)
async def producer():
for i in range(num_jobs):
results = await loop.run_in_executor(executor, work)
await queue.put(results)
async def consumer():
completed = 0
while completed < num_jobs:
job = await queue.get()
completed += 1
s = time()
loop.run_until_complete(asyncio.gather(producer(), consumer()))
print("duration", time() - s)
Running the above on a machine with more than 4 cores takes ~4 seconds. How would you write producer
such that the above example takes only ~1 second?
Upvotes: 20
Views: 14749
Reputation: 155670
The problem is in the producer
. Instead of allowing the jobs to run in the background, it waits for each job to finish, thus serializing them. If you rewrite producer
to look like this (and leave consumer
unchanged), you get the expected 1s duration:
async def producer():
for i in range(num_jobs):
fut = loop.run_in_executor(executor, work)
fut.add_done_callback(lambda f: queue.put_nowait(f.result()))
Upvotes: 9
Reputation: 46603
await loop.run_in_executor(executor, work)
blocks the loop until work
completes, as a result you only have one function running at a time.
To run jobs concurrently, you could use asyncio.as_completed
:
async def producer():
tasks = [loop.run_in_executor(executor, work) for _ in range(num_jobs)]
for f in asyncio.as_completed(tasks, loop=loop):
results = await f
await queue.put(results)
Upvotes: 18