gatopeich
gatopeich

Reputation: 3818

Multiprocess Queue synchronization with asyncio

I want to gather data from asyncio loops running in sibling processes with Python 3.7

Ideally I would use a multiprocess.JoinableQueue, relaying on its join() call for synchronization.

However, its synchronization primitives block the event loop in full (see my partial answer below for an example).

Illustrative prototype:

class MP_GatherDict(dict):
    '''A per-process dictionary which can be gathered from a single one'''
    def __init__(self):
        self.q = multiprocess.JoinableQueue()
        super().__init__()

    async def worker_process_server(self):
        while True:
            (await?) self.q.put(dict(self)) # Put a shallow copy
            (await?) self.q.join() # Wait for it to be gathered

    async def gather(self):
        all_dicts = []
        while not self.q.empty():
            all_dicts.append(await self.q.get())
            self.q.task_done()
        return all_dicts

Note that the put->get->join->put flow might not work as expected but this question really is about using multiprocess primitives in asyncio event loop...

The question would then be how to best await for multiprocess primitives from an asyncio event loop?

Upvotes: 1

Views: 3041

Answers (2)

Jylpah
Jylpah

Reputation: 373

This comes bit late, but.

You need to create an async wrapper around the mp.JoinableQueue() since both get()and put() block the whole process (GIL).

There are two approaches for this:

  1. Use threads
  2. Use asyncio.sleep() and get_nowait(), put_nowait() methods.

I chose the option 2 since it is easy.

from queue import Queue, Full, Empty
from typing import Any, Generic, TypeVar
from asyncio import sleep

T= TypeVar('T')

class AsyncQueue(Generic[T]):
    """Async wrapper for queue.Queue"""

    SLEEP: float = 0.01

    def __init__(self, queue: Queue[T]):
        self._Q : Queue[T] = queue


    async def get(self) -> T:
        while True:
            try:
                return self._Q.get_nowait()
            except Empty:
                await sleep(self.SLEEP)
    

    async def put(self, item: T) -> None:       
        while True:
            try:
                self._Q.put_nowait(item)
                return None
            except Full:
                await sleep(self.SLEEP)


    def task_done(self) -> None:
        self._Q.task_done()
        return None

Upvotes: 1

gatopeich
gatopeich

Reputation: 3818

This test shows that multiprocess.Queue.get() blocks the whole event loop:

mp_q = mp.JoinableQueue()
async def mp_queue_wait():
    try:
        print('Queue:',mp_q.get(timeout=2))
    except Exception as ex:
        print('Queue:',repr(ex))

async def main_loop_task():
    task = asyncio.get_running_loop().create_task(mp_queue_wait())
    for i in range(3):
        print(i, os.times())
        await asyncio.sleep(1)
    await task
    print(repr(task))

asyncio.run(main_loop_task())

Whose output is:

0 posix.times_result(user=0.41, system=0.04, children_user=0.0, children_system=0.0, elapsed=17208620.18)
Queue: Empty()
1 posix.times_result(user=0.41, system=0.04, children_user=0.0, children_system=0.0, elapsed=17208622.18)
2 posix.times_result(user=0.41, system=0.04, children_user=0.0, children_system=0.0, elapsed=17208623.18)
<Task finished coro=<mp_queue_wait() done,...> result=None>

So I am looking at asyncio.loop.run_in_executor() as the next possible answer, however spawning an executor/thread just for this seems overkill...

Here is same test using the default executor:

async def mp_queue_wait():
    try:
        result = await asyncio.get_running_loop().run_in_executor(None,mp_q.get,True,2)
    except Exception as ex:
        result = ex
    print('Queue:',repr(result))
    return result 

And the (desired) result:

0 posix.times_result(user=0.36, system=0.02, children_user=0.0, children_system=0.0, elapsed=17210674.65)
1 posix.times_result(user=0.37, system=0.02, children_user=0.0, children_system=0.0, elapsed=17210675.65)
Queue: Empty()
2 posix.times_result(user=0.37, system=0.02, children_user=0.0, children_system=0.0, elapsed=17210676.66)
<Task finished coro=<mp_queue_wait() done, defined at /home/apozuelo/Documents/5G_SBA/Tera5G/services/db.py:211> result=Empty()>

Upvotes: 1

Related Questions