Reputation: 103
I am looking for the best solution for communication between async tasks and methods/functions that run in a thread pool executor from concurrent.futures. In previous synchronous projects, I would use the queue.Queue
class. I assume that any method should be thread safe and therefore the asyncio.queue
will not work.
I have seen people extend the queue.Queue
class to do something like:
class async_queue(Queue):
async def aput(self, item):
self.put_nowait(item)
async def aget(self):
resp = await asyncio.get_event_loop().run_in_executor( None, self.get )
return resp
Is there a better way?
Upvotes: 10
Views: 5752
Reputation: 251
The disadvantage of the solution via run_coroutine_threadsafe()
given in the other answer is that a worker thread will actually depend on your event loop: the thread will wait for the event loop to process your put()
and get()
. If your event loop is doing a lot of work, this can lead to very long waits, up to seconds of real time.
I suggest using aiologic.Queue
(I'm the creator of aiologic), which doesn't have this disadvantage: wherever you do put()
or get()
, the thread or task will only wait if it's really necessary (e.g. if you do get()
when the queue is empty).
queue = aiologic.Queue(["first", "second"], maxsize=3)
Timer(1, queue.green_put, ["third"]).start()
print(await queue.async_get()) # "first"
print(await queue.async_get()) # "second"
print(await queue.async_get()) # after one second: "third"
And if you don't need to set the upperbound limit on the number of items that can be placed in the queue, try aiologic.SimpleQueue
. With it you will get maximum performance when the queue is rarely empty and there are many consumers (and of course, nothing prevents you from using this queue in other cases).
Upvotes: 2
Reputation: 154866
I would recommend going the other way around: using the asyncio.Queue class to communicate between the two worlds. This has the advantage of not having to spend a slot in the thread pool on operations that take a long time to complete, such as a get()
.
Here is an example:
class Queue:
def __init__(self):
self._loop = asyncio.get_running_loop()
self._queue = asyncio.Queue()
def sync_put_nowait(self, item):
self._loop.call_soon(self._queue.put_nowait, item)
def sync_put(self, item):
asyncio.run_coroutine_threadsafe(self._queue.put(item), self._loop).result()
def sync_get(self):
return asyncio.run_coroutine_threadsafe(self._queue.get(item), self._loop).result()
def async_put_nowait(self, item):
self._queue.put_nowait(item)
async def async_put(self, item):
await self._queue.put(item)
async def async_get(self):
return await self._queue.get()
The methods prefixed with sync_
are meant to be invoked by sync code (running outside the event loop thread). The ones prefixed with async_
are to be called by code running in the event loop thread, regardless of whether they are actually coroutines. (put_nowait
, for example, is not a coroutine, but it still must be distinguished between a sync and an async version.)
Upvotes: 12