Chris Seymour
Chris Seymour

Reputation: 85913

Combining asyncio with a multi-worker ProcessPoolExecutor

Is it possible to take a blocking function such as work and have it run concurrently in a ProcessPoolExecutor that has more than one worker?

import asyncio
from time import sleep, time
from concurrent.futures import ProcessPoolExecutor

num_jobs = 4
queue = asyncio.Queue()
executor = ProcessPoolExecutor(max_workers=num_jobs)
loop = asyncio.get_event_loop()

def work():
    sleep(1)

async def producer():
    for i in range(num_jobs):
        results = await loop.run_in_executor(executor, work)
        await queue.put(results)

async def consumer():
    completed = 0
    while completed < num_jobs:
        job = await queue.get()
        completed += 1

s = time()
loop.run_until_complete(asyncio.gather(producer(), consumer()))
print("duration", time() - s)

Running the above on a machine with more than 4 cores takes ~4 seconds. How would you write producer such that the above example takes only ~1 second?

Upvotes: 20

Views: 14749

Answers (2)

user4815162342
user4815162342

Reputation: 155670

The problem is in the producer. Instead of allowing the jobs to run in the background, it waits for each job to finish, thus serializing them. If you rewrite producer to look like this (and leave consumer unchanged), you get the expected 1s duration:

async def producer():
    for i in range(num_jobs):
        fut = loop.run_in_executor(executor, work)
        fut.add_done_callback(lambda f: queue.put_nowait(f.result()))

Upvotes: 9

vaultah
vaultah

Reputation: 46603

await loop.run_in_executor(executor, work) blocks the loop until work completes, as a result you only have one function running at a time.

To run jobs concurrently, you could use asyncio.as_completed:

async def producer():
    tasks = [loop.run_in_executor(executor, work) for _ in range(num_jobs)]
    for f in asyncio.as_completed(tasks, loop=loop):
        results = await f
        await queue.put(results)

Upvotes: 18

Related Questions