Rasmus
Rasmus

Reputation: 77

Python - Combining multiprocessing with asyncio works only sometimes

I would like to combine asyncio and multiprocessing as I have a task where a part is io-bound and another is cpu-bound. I first tried to use loop.run_in_executor(), but I couldn't get it to work probably. Instead I went with creating two processes where one uses asyncio and the other doesn't.

The code is such that I have a class with some non-blocking functions and one blocking. I have an asyncio.Queue to pass information between the non-blocking parts and a multiprocessing.Queue to pass information between the non-blocking and the blocking functions.

import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor
import asyncio
import time


class TestClass:
    def __init__(self):
        m = mp.Manager()
        self.blocking_queue = m.Queue()

    async def run(self):
        loop = asyncio.get_event_loop()
        self.non_blocking_queue = asyncio.Queue() # asyncio Queue must be declared within event loop
        task1 = loop.create_task(self.non_blocking1())
        task2 = loop.create_task(self.non_blocking2())
        task3 = loop.create_task(self.print_msgs())
        await asyncio.gather(task1, task2)
        task3.cancel()

    def blocking(self):
        i = 0
        while i < 5:
            time.sleep(0.6)
            i += 1
            print("Blocking ", i)
            line = self.blocking_queue.get()
            print("Blocking: ", line)
        print("blocking done")

    async def non_blocking1(self):
        for i in range(5):
            await self.non_blocking_queue.put("Hello")
            await asyncio.sleep(0.4)

    async def non_blocking2(self):
        for i in range(5):
            await self.non_blocking_queue.put("World")
            await asyncio.sleep(0.5)

    async def print_msgs(self):
        while True:
            line = await self.non_blocking_queue.get()
            self.blocking_queue.put(line)
            print(line)


test_class = TestClass()
with ProcessPoolExecutor() as pool:
    pool.submit(test_class.blocking)
    pool.submit(asyncio.run(test_class.run()))
print("done")

About half the times I run this, it works fine and prints out the text in the blocking and the non-blocking queues. The other half it only prints out the results of the non-blocking queue. It looks like the blocking process isn't started at all. It is not consequent every other time. It might work five times in a row and then not work five times in row.

What might cause such a problem? Which better way can I do this, using both multiprocessing and asyncio?

Upvotes: 2

Views: 1093

Answers (1)

Sam Mason
Sam Mason

Reputation: 16213

running the async task "inside" the other process works for me, e.g.:

def runfn(fn):
    return asyncio.run(fn())

with ProcessPoolExecutor() as pool:
    pool.submit(test_class.blocking)
    pool.submit(runfn, test_class.run)

presumably there's some state inside asyncio/the task that needs to be consistent or gets broken when running in another process

Upvotes: 1

Related Questions