Reputation: 3740
Let's assume I have the following code:
import asyncio
import threading
queue = asyncio.Queue()
def threaded():
import time
while True:
time.sleep(2)
queue.put_nowait(time.time())
print(queue.qsize())
@asyncio.coroutine
def async():
while True:
time = yield from queue.get()
print(time)
loop = asyncio.get_event_loop()
asyncio.Task(async())
threading.Thread(target=threaded).start()
loop.run_forever()
The problem with this code is that the loop inside async
coroutine is never finishing the first iteration, while queue
size is increasing.
Why is this happening this way and what can I do to fix it?
I can't get rid of separate thread, because in my real code I use a separate thread to communicate with a serial device, and I haven't find a way to do that using asyncio
.
Upvotes: 31
Views: 34333
Reputation: 261
There's already a short answer, but for those who are curious, I'll give the long one.
As we all know, asyncio synchronization primitives and asyncio queues (communication primitives) are not thread-safe/thread-aware. In fact, they aren't because of the following three limitations:
threading.Lock
) so that concurrent threads couldn't break anything. But asyncio doesn't do this, and this can lead to unexpected or erroneous behavior in a multithreaded environment. For example, if two threads call get_nowait()
with a single-element queue, they can both pass the check that the queue is not empty, but only the first can get a queue item - the second will raise an IndexError
. This is why you shouldn't use asyncio primitives in such an environment even if everything seems to work at first glance: what if your code unexpectedly hangs, raises an exception on Monday night, or even drops the Internet?set_result()
method of futures to wake up tasks, and that one uses the call_soon()
method, which only works in the current thread. That is, if the event loop is waiting for something, it won't resume until that something is ready and notify it properly. It can be reading a file, writing to a socket or eternal sleep of the library you are using - all the things that sleeping tasks usually wait for. If the event loop is not waiting for anything, it may react to the call_soon()
call, but as you can see, you're out of luck.The first limitation is the main one when talking about thread-safety. Even if you overcome the first one, because of the next two, primitives will actually not support threads. Only when all three limitations are overcome, primitives can be considered thread-aware.
Now, with enough knowledge, we can talk about what attempts have been made to fix the problem.
The solution via asyncio.Queue
+ _write_to_self()
is the simplest proposed solution, but it is also the worst, and not even because it uses an undocumented API. This solution overcomes only the third limitation - the put_nowait()
method is still not thread-safe. As a result, non-atomic incrementing of the number of unfinished tasks may not work when putters are more than one. You will also have to ensure that you call get()
before the thread calls put_nowait()
, because otherwise, due to the race condition, the thread may wake up the next waiting task before your task adds its future, and thus the thread will not actually wake up anyone - your task will hang.
The solution via asyncio.Queue
+ call_soon_threadsafe()
overcomes the first and third limitations, but put_nowait()
will actually only have an effect when the event loop handles the corresponding callback: no thread will ever have up-to-date information about the state of the queue. You will also lose checks, and exceptions will be raised in the event loop.
The solution via asyncio.Queue
/queue.Queue
+ run_coroutine_threadsafe()
(partially presented here) overcomes the first and third limitations, but without waiting for the future it comes down to call_soon_threadsafe()
, and with waiting your thread will also depend on the event loop: if it gets sick, your thread will too.
The solution via queue.Queue
+ call_soon_threadsafe()
overcomes all three limitations, but it does not support cancellation, so in case of timeout the next waiting task may not be woken up. It also does not handle the case where the event loop is closed before calling put()
, which may raise a RuntimeError
.
The solution via queue.Queue
+ run_in_executor()
(not presented here) overcomes all three limitations, but does not support cancellation because threads cannot be cancelled. As a result, if you try to cancel, you will lose an item from the queue, get a thread leak, or even exhaust the limit on the number of executor threads running concurrently - your application will hang.
As you can see, no classic solution is production-ready. As for libraries, they have problems too, because they actually adopt the approaches described above:
aioprocessing
works via threads, so it actually multiprocessing.Queue
+ run_in_executor()
.janus
uses tasks for notifications and overcomes the first and third limitations. It supports cancellation, and changes are visible immediately. But because of this classic architecture, it has performance issues, and you need to close the queue when you are done with it.I have shown that all popular solutions have their downsides. Is that all? No, because I want to offer you my unpopular solutions. I'm the creator of two packages designed to solve this and many other problems.
The most relevant is the culsans
package. It provides queues similar to janus
, but with good performance and additional features. Includes all the benefits of janus
, but also overcomes the second limitation, guarantees queue fairness, works not only with asyncio, and more. It doesn't create tasks, so culsans
queues don't need to be closed.
import asyncio
import threading
import time
import culsans
queue = culsans.Queue()
def threaded():
while True:
time.sleep(2)
queue.sync_q.put_nowait(time.time())
print(queue.sync_q.qsize())
async def async_func():
while True:
time = await queue.async_q.get()
print(time)
threading.Thread(target=threaded).start()
asyncio.run(async_func())
The aiologic
package does not provide API-compatible interfaces, but it does provide a number of different synchronization primitives, such as locks, semaphores, and barriers you know. It has two features that culsans
doesn't: it doesn't use synchronization primitives (your event loop will never block), and each asynchronous call performs at most one context switch (except for methods like Condition.__await__()
). The price of supporting these two features is the inability to combine synchronous and asynchronous calls in the same thread without risk of deadlock (with culsans
you can), a non-standard interface and some implementation specifics. But if your application is performance-sensitive, it may be the best solution for you.
import asyncio
import threading
import time
import aiologic
queue = aiologic.SimpleQueue()
def threaded():
while True:
time.sleep(2)
queue.put(time.time())
print(len(queue))
async def async_func():
while True:
time = await queue.async_get()
print(time)
threading.Thread(target=threaded).start()
asyncio.run(async_func())
Upvotes: 2
Reputation: 94961
asyncio.Queue
is not thread-safe, so you can't use it directly from more than one thread. Instead, you can use janus
, which is a third-party library that provides a thread-aware asyncio
queue.
import asyncio
import threading
import janus
def threaded(squeue):
import time
while True:
time.sleep(2)
squeue.put_nowait(time.time())
print(squeue.qsize())
@asyncio.coroutine
def async_func(aqueue):
while True:
time = yield from aqueue.get()
print(time)
loop = asyncio.get_event_loop()
queue = janus.Queue(loop=loop)
asyncio.create_task(async_func(queue.async_q))
threading.Thread(target=threaded, args=(queue.sync_q,)).start()
loop.run_forever()
There is also aioprocessing
(full-disclosure: I wrote it), which provides process-safe (and as a side-effect, thread-safe) queues as well, but that's overkill if you're not trying to use multiprocessing
.
Edit
As pointed it out in other answers, for simple use-cases you can use loop.call_soon_threadsafe
to add to the queue, as well.
Upvotes: 36
Reputation: 455
What about just using threading.Lock with asyncio.Queue?
class ThreadSafeAsyncFuture(asyncio.Future):
""" asyncio.Future is not thread-safe
https://stackoverflow.com/questions/33000200/asyncio-wait-for-event-from-other-thread
"""
def set_result(self, result):
func = super().set_result
call = lambda: func(result)
self._loop.call_soon_threadsafe(call) # Warning: self._loop is undocumented
class ThreadSafeAsyncQueue(queue.Queue):
""" asyncio.Queue is not thread-safe, threading.Queue is not awaitable
works only with one putter to unlimited-size queue and with several getters
TODO: add maxsize limits
TODO: make put corouitine
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.lock = threading.Lock()
self.loop = asyncio.get_event_loop()
self.waiters = []
def put(self, item):
with self.lock:
if self.waiters:
self.waiters.pop(0).set_result(item)
else:
super().put(item)
async def get(self):
with self.lock:
if not self.empty():
return super().get()
else:
fut = ThreadSafeAsyncFuture()
self.waiters.append(fut)
result = await fut
return result
See also - asyncio: Wait for event from other thread
Upvotes: 3
Reputation: 2448
If you do not want to use another library you can schedule a coroutine from the thread. Replacing the queue.put_nowait
with the following works fine.
asyncio.run_coroutine_threadsafe(queue.put(time.time()), loop)
The variable loop
represents the event loop in the main thread.
EDIT:
The reason why your async
coroutine is not doing anything is that
the event loop never gives it a chance to do so. The queue object is
not threadsafe and if you dig through the cpython code you find that
this means that put_nowait
wakes up consumers of the queue through
the use of a future with the call_soon
method of the event loop. If
we could make it use call_soon_threadsafe
it should work. The major
difference between call_soon
and call_soon_threadsafe
, however, is
that call_soon_threadsafe
wakes up the event loop by calling loop._write_to_self()
. So let's call it ourselves:
import asyncio
import threading
queue = asyncio.Queue()
def threaded():
import time
while True:
time.sleep(2)
queue.put_nowait(time.time())
queue._loop._write_to_self()
print(queue.qsize())
@asyncio.coroutine
def async():
while True:
time = yield from queue.get()
print(time)
loop = asyncio.get_event_loop()
asyncio.Task(async())
threading.Thread(target=threaded).start()
loop.run_forever()
Then, everything works as expected.
As for the threadsafe aspect of
accessing shared objects,asyncio.queue
uses under the hood
collections.deque
which has threadsafe append
and popleft
.
Maybe checking for queue not empty and popleft is not atomic, but if
you consume the queue only in one thread (the one of the event loop)
it could be fine.
The other proposed solutions, loop.call_soon_threadsafe
from Huazuo
Gao's answer and my asyncio.run_coroutine_threadsafe
are just doing
this, waking up the event loop.
Upvotes: 25
Reputation: 1733
BaseEventLoop.call_soon_threadsafe
is at hand. See asyncio
doc for detail.
Simply change your threaded()
like this:
def threaded():
import time
while True:
time.sleep(1)
loop.call_soon_threadsafe(queue.put_nowait, time.time())
loop.call_soon_threadsafe(lambda: print(queue.qsize()))
Here's a sample output:
0
1443857763.3355968
0
1443857764.3368602
0
1443857765.338082
0
1443857766.3392274
0
1443857767.3403943
Upvotes: 14