Reputation: 3818
I want to gather data from asyncio
loops running in sibling processes with Python 3.7
Ideally I would use a multiprocess.JoinableQueue
, relaying on its join()
call for synchronization.
However, its synchronization primitives block the event loop in full (see my partial answer below for an example).
Illustrative prototype:
class MP_GatherDict(dict):
'''A per-process dictionary which can be gathered from a single one'''
def __init__(self):
self.q = multiprocess.JoinableQueue()
super().__init__()
async def worker_process_server(self):
while True:
(await?) self.q.put(dict(self)) # Put a shallow copy
(await?) self.q.join() # Wait for it to be gathered
async def gather(self):
all_dicts = []
while not self.q.empty():
all_dicts.append(await self.q.get())
self.q.task_done()
return all_dicts
Note that the put->get->join->put
flow might not work as expected but this question really is about using multiprocess
primitives in asyncio
event loop...
The question would then be how to best await
for multiprocess
primitives from an asyncio event loop?
Upvotes: 1
Views: 3041
Reputation: 373
This comes bit late, but.
You need to create an async wrapper around the mp.JoinableQueue()
since both get()
and put()
block the whole process (GIL).
There are two approaches for this:
asyncio.sleep()
and get_nowait()
, put_nowait()
methods.I chose the option 2 since it is easy.
from queue import Queue, Full, Empty
from typing import Any, Generic, TypeVar
from asyncio import sleep
T= TypeVar('T')
class AsyncQueue(Generic[T]):
"""Async wrapper for queue.Queue"""
SLEEP: float = 0.01
def __init__(self, queue: Queue[T]):
self._Q : Queue[T] = queue
async def get(self) -> T:
while True:
try:
return self._Q.get_nowait()
except Empty:
await sleep(self.SLEEP)
async def put(self, item: T) -> None:
while True:
try:
self._Q.put_nowait(item)
return None
except Full:
await sleep(self.SLEEP)
def task_done(self) -> None:
self._Q.task_done()
return None
Upvotes: 1
Reputation: 3818
This test shows that multiprocess.Queue.get()
blocks the whole event loop:
mp_q = mp.JoinableQueue()
async def mp_queue_wait():
try:
print('Queue:',mp_q.get(timeout=2))
except Exception as ex:
print('Queue:',repr(ex))
async def main_loop_task():
task = asyncio.get_running_loop().create_task(mp_queue_wait())
for i in range(3):
print(i, os.times())
await asyncio.sleep(1)
await task
print(repr(task))
asyncio.run(main_loop_task())
Whose output is:
0 posix.times_result(user=0.41, system=0.04, children_user=0.0, children_system=0.0, elapsed=17208620.18)
Queue: Empty()
1 posix.times_result(user=0.41, system=0.04, children_user=0.0, children_system=0.0, elapsed=17208622.18)
2 posix.times_result(user=0.41, system=0.04, children_user=0.0, children_system=0.0, elapsed=17208623.18)
<Task finished coro=<mp_queue_wait() done,...> result=None>
So I am looking at asyncio.loop.run_in_executor() as the next possible answer, however spawning an executor/thread just for this seems overkill...
Here is same test using the default executor:
async def mp_queue_wait():
try:
result = await asyncio.get_running_loop().run_in_executor(None,mp_q.get,True,2)
except Exception as ex:
result = ex
print('Queue:',repr(result))
return result
And the (desired) result:
0 posix.times_result(user=0.36, system=0.02, children_user=0.0, children_system=0.0, elapsed=17210674.65)
1 posix.times_result(user=0.37, system=0.02, children_user=0.0, children_system=0.0, elapsed=17210675.65)
Queue: Empty()
2 posix.times_result(user=0.37, system=0.02, children_user=0.0, children_system=0.0, elapsed=17210676.66)
<Task finished coro=<mp_queue_wait() done, defined at /home/apozuelo/Documents/5G_SBA/Tera5G/services/db.py:211> result=Empty()>
Upvotes: 1