Aleksandr Kovalev
Aleksandr Kovalev

Reputation: 3740

Is there a way to use asyncio.Queue in multiple threads?

Let's assume I have the following code:

import asyncio
import threading

queue = asyncio.Queue()

def threaded():
    import time
    while True:
        time.sleep(2)
        queue.put_nowait(time.time())
        print(queue.qsize())

@asyncio.coroutine
def async():
    while True:
        time = yield from queue.get()
        print(time)

loop = asyncio.get_event_loop()
asyncio.Task(async())
threading.Thread(target=threaded).start()
loop.run_forever()

The problem with this code is that the loop inside async coroutine is never finishing the first iteration, while queue size is increasing.

Why is this happening this way and what can I do to fix it?

I can't get rid of separate thread, because in my real code I use a separate thread to communicate with a serial device, and I haven't find a way to do that using asyncio.

Upvotes: 31

Views: 34333

Answers (5)

Ilya Egorov
Ilya Egorov

Reputation: 261

There's already a short answer, but for those who are curious, I'll give the long one.


As we all know, asyncio synchronization primitives and asyncio queues (communication primitives) are not thread-safe/thread-aware. In fact, they aren't because of the following three limitations:

  1. They are designed exclusively for single-threaded execution and thus are not protected against race conditions. In a normal multithreaded application, you would make access to your object's state exclusive (e.g., via threading.Lock) so that concurrent threads couldn't break anything. But asyncio doesn't do this, and this can lead to unexpected or erroneous behavior in a multithreaded environment. For example, if two threads call get_nowait() with a single-element queue, they can both pass the check that the queue is not empty, but only the first can get a queue item - the second will raise an IndexError. This is why you shouldn't use asyncio primitives in such an environment even if everything seems to work at first glance: what if your code unexpectedly hangs, raises an exception on Monday night, or even drops the Internet?
  2. They are designed exclusively for a single event loop. Each asyncio primitive stores a reference to the event loop in which it was first used (prior to Python 3.10, in which it was created), and raises an exception if you try to use it in another event loop. This is why you can't rely on using only asynchronous methods: one event loop can only run in one thread.
  3. They use the set_result() method of futures to wake up tasks, and that one uses the call_soon() method, which only works in the current thread. That is, if the event loop is waiting for something, it won't resume until that something is ready and notify it properly. It can be reading a file, writing to a socket or eternal sleep of the library you are using - all the things that sleeping tasks usually wait for. If the event loop is not waiting for anything, it may react to the call_soon() call, but as you can see, you're out of luck.

The first limitation is the main one when talking about thread-safety. Even if you overcome the first one, because of the next two, primitives will actually not support threads. Only when all three limitations are overcome, primitives can be considered thread-aware.


Now, with enough knowledge, we can talk about what attempts have been made to fix the problem.

The solution via asyncio.Queue + _write_to_self() is the simplest proposed solution, but it is also the worst, and not even because it uses an undocumented API. This solution overcomes only the third limitation - the put_nowait() method is still not thread-safe. As a result, non-atomic incrementing of the number of unfinished tasks may not work when putters are more than one. You will also have to ensure that you call get() before the thread calls put_nowait(), because otherwise, due to the race condition, the thread may wake up the next waiting task before your task adds its future, and thus the thread will not actually wake up anyone - your task will hang.

The solution via asyncio.Queue + call_soon_threadsafe() overcomes the first and third limitations, but put_nowait() will actually only have an effect when the event loop handles the corresponding callback: no thread will ever have up-to-date information about the state of the queue. You will also lose checks, and exceptions will be raised in the event loop.

The solution via asyncio.Queue/queue.Queue + run_coroutine_threadsafe() (partially presented here) overcomes the first and third limitations, but without waiting for the future it comes down to call_soon_threadsafe(), and with waiting your thread will also depend on the event loop: if it gets sick, your thread will too.

The solution via queue.Queue + call_soon_threadsafe() overcomes all three limitations, but it does not support cancellation, so in case of timeout the next waiting task may not be woken up. It also does not handle the case where the event loop is closed before calling put(), which may raise a RuntimeError.

The solution via queue.Queue + run_in_executor() (not presented here) overcomes all three limitations, but does not support cancellation because threads cannot be cancelled. As a result, if you try to cancel, you will lose an item from the queue, get a thread leak, or even exhaust the limit on the number of executor threads running concurrently - your application will hang.

As you can see, no classic solution is production-ready. As for libraries, they have problems too, because they actually adopt the approaches described above:


I have shown that all popular solutions have their downsides. Is that all? No, because I want to offer you my unpopular solutions. I'm the creator of two packages designed to solve this and many other problems.

The most relevant is the culsans package. It provides queues similar to janus, but with good performance and additional features. Includes all the benefits of janus, but also overcomes the second limitation, guarantees queue fairness, works not only with asyncio, and more. It doesn't create tasks, so culsans queues don't need to be closed.

import asyncio
import threading
import time

import culsans

queue = culsans.Queue()

def threaded():
    while True:
        time.sleep(2)
        queue.sync_q.put_nowait(time.time())
        print(queue.sync_q.qsize())

async def async_func():
    while True:
        time = await queue.async_q.get()
        print(time)

threading.Thread(target=threaded).start()
asyncio.run(async_func())

The aiologic package does not provide API-compatible interfaces, but it does provide a number of different synchronization primitives, such as locks, semaphores, and barriers you know. It has two features that culsans doesn't: it doesn't use synchronization primitives (your event loop will never block), and each asynchronous call performs at most one context switch (except for methods like Condition.__await__()). The price of supporting these two features is the inability to combine synchronous and asynchronous calls in the same thread without risk of deadlock (with culsans you can), a non-standard interface and some implementation specifics. But if your application is performance-sensitive, it may be the best solution for you.

import asyncio
import threading
import time

import aiologic

queue = aiologic.SimpleQueue()

def threaded():
    while True:
        time.sleep(2)
        queue.put(time.time())
        print(len(queue))

async def async_func():
    while True:
        time = await queue.async_get()
        print(time)

threading.Thread(target=threaded).start()
asyncio.run(async_func())

Upvotes: 2

dano
dano

Reputation: 94961

asyncio.Queue is not thread-safe, so you can't use it directly from more than one thread. Instead, you can use janus, which is a third-party library that provides a thread-aware asyncio queue.

import asyncio
import threading
import janus

def threaded(squeue):
    import time
    while True:
        time.sleep(2)
        squeue.put_nowait(time.time())
        print(squeue.qsize())

@asyncio.coroutine
def async_func(aqueue):
    while True:
        time = yield from aqueue.get()
        print(time)

loop = asyncio.get_event_loop()
queue = janus.Queue(loop=loop)
asyncio.create_task(async_func(queue.async_q))
threading.Thread(target=threaded, args=(queue.sync_q,)).start()
loop.run_forever()

There is also aioprocessing (full-disclosure: I wrote it), which provides process-safe (and as a side-effect, thread-safe) queues as well, but that's overkill if you're not trying to use multiprocessing.

Edit

As pointed it out in other answers, for simple use-cases you can use loop.call_soon_threadsafe to add to the queue, as well.

Upvotes: 36

vladimirfol
vladimirfol

Reputation: 455

What about just using threading.Lock with asyncio.Queue?

class ThreadSafeAsyncFuture(asyncio.Future):
    """ asyncio.Future is not thread-safe
    https://stackoverflow.com/questions/33000200/asyncio-wait-for-event-from-other-thread
    """
    def set_result(self, result):
        func = super().set_result
        call = lambda: func(result)
        self._loop.call_soon_threadsafe(call)  # Warning: self._loop is undocumented


class ThreadSafeAsyncQueue(queue.Queue):
    """ asyncio.Queue is not thread-safe, threading.Queue is not awaitable
    works only with one putter to unlimited-size queue and with several getters
    TODO: add maxsize limits
    TODO: make put corouitine
    """
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.lock = threading.Lock()
        self.loop = asyncio.get_event_loop()
        self.waiters = []

    def put(self, item):
        with self.lock:
            if self.waiters:
                self.waiters.pop(0).set_result(item)
            else:
                super().put(item)

    async def get(self):
        with self.lock:
            if not self.empty():
                return super().get()
            else:
                fut = ThreadSafeAsyncFuture()
                self.waiters.append(fut)
        result = await fut
        return result

See also - asyncio: Wait for event from other thread

Upvotes: 3

cronos
cronos

Reputation: 2448

If you do not want to use another library you can schedule a coroutine from the thread. Replacing the queue.put_nowait with the following works fine.

asyncio.run_coroutine_threadsafe(queue.put(time.time()), loop)

The variable loop represents the event loop in the main thread.

EDIT:

The reason why your async coroutine is not doing anything is that the event loop never gives it a chance to do so. The queue object is not threadsafe and if you dig through the cpython code you find that this means that put_nowait wakes up consumers of the queue through the use of a future with the call_soon method of the event loop. If we could make it use call_soon_threadsafe it should work. The major difference between call_soon and call_soon_threadsafe, however, is that call_soon_threadsafe wakes up the event loop by calling loop._write_to_self() . So let's call it ourselves:

import asyncio
import threading

queue = asyncio.Queue()

def threaded():
    import time
    while True:
        time.sleep(2)
        queue.put_nowait(time.time())
        queue._loop._write_to_self()
        print(queue.qsize())

@asyncio.coroutine
def async():
    while True:
        time = yield from queue.get()
        print(time)

loop = asyncio.get_event_loop()
asyncio.Task(async())
threading.Thread(target=threaded).start()
loop.run_forever()

Then, everything works as expected.

As for the threadsafe aspect of accessing shared objects,asyncio.queue uses under the hood collections.deque which has threadsafe append and popleft. Maybe checking for queue not empty and popleft is not atomic, but if you consume the queue only in one thread (the one of the event loop) it could be fine.

The other proposed solutions, loop.call_soon_threadsafe from Huazuo Gao's answer and my asyncio.run_coroutine_threadsafe are just doing this, waking up the event loop.

Upvotes: 25

Huazuo Gao
Huazuo Gao

Reputation: 1733

BaseEventLoop.call_soon_threadsafe is at hand. See asyncio doc for detail.

Simply change your threaded() like this:

def threaded():
    import time
    while True:
        time.sleep(1)
        loop.call_soon_threadsafe(queue.put_nowait, time.time())
        loop.call_soon_threadsafe(lambda: print(queue.qsize()))

Here's a sample output:

0
1443857763.3355968
0
1443857764.3368602
0
1443857765.338082
0
1443857766.3392274
0
1443857767.3403943

Upvotes: 14

Related Questions