Rick Vink
Rick Vink

Reputation: 333

asyncio event wait function does not continue after event has been stored

I'm working on a function that should continue after a response has been given through a websocket. In order to achieve this I use an asyncio event which will be set after the response has been given.

Three functions are involved:

async def send(self, message):
    await self.channel(message.toJSON())
    if (message.method == 'get' or message.method == 'post'):
        event = asyncio.Event()
        self._queueMessage(message, event)
        await event.wait()
    print('continue')

def _queueMessage(self, message, event):
    self.queue.append([message, event])

def _process_response_message(self, message):
    for entry in self.queue:
        if (message['_id'] == entry[0]._id):
            print(entry[1])
            entry[1].set()
            print(entry[1])
            return

Returns:

<asyncio.locks.Event object at 0x7f3a1ff20da0 [unset,waiters:1]>
<asyncio.locks.Event object at 0x7f3a1ff20da0 [set,waiters:1]>

In this example, the print('continue') function is never called and I do not why because the .set() function is actually called and .set() does seem to work fine if I use it before I call await event.wait().

Is there something I'm missing?

Upvotes: 2

Views: 1771

Answers (1)

Mikhail Gerasimov
Mikhail Gerasimov

Reputation: 39576

Based on message you get _process_response_message seems to be running in another thread. asyncio.Event is not thread-safe object you should use loop.call_soon_threadsafe function to call it's methods from other thread. Try to change your code like this:

async def send(self, message):
    await self.channel(message.toJSON())
    if (message.method == 'get' or message.method == 'post'):
        loop = asyncio.get_event_loop()
        event = asyncio.Event()
        self._queueMessage(message, loop, event)
        await event.wait()
    print('continue')

def _queueMessage(self, message, loop, event):
    self.queue.append([message, loop, event])

def _process_response_message(self, message):
    for entry in self.queue:
        qmsg, loop, event = entry
        if (message['_id'] == qmsg._id):
            loop.call_soon_threadsafe(event.set)
            return

Upvotes: 2

Related Questions