Adrian Sevcenco
Adrian Sevcenco

Reputation: 311

asyncio :: access a websocket created in another loop

I am trying to convert some functions to NOT being async, so i run them in a run_until_complete; but i would like to create a websocket in a NOT async function and then use that websocket from multiple other functions also NOT async. Trying to get the loop and run another function give me an exception that there is already a loop running. Is there a way to do this? Thank you!

Upvotes: 1

Views: 711

Answers (1)

user4815162342
user4815162342

Reputation: 155166

Please be aware that what you are attempting goes against the design of asyncio, where the idea is that the whole program (or at least the whole thread that talks to the server) goes async. However, it is still possible to execute async functions as sync, you'll just need to use some tricks.

Your best bet is to create a dedicated thread that runs the event loop, and submit tasks to the event loop using asyncio.run_coroutine_threadsafe().

You can even abstract it into a decorator that turns any async method into sync:

import threading, asyncio

def start_asyncio():
    loop = None
    ready = threading.Event()
    async def wait_forever():
        nonlocal loop
        loop = asyncio.get_event_loop()
        ready.set()
        await loop.create_future()
    threading.Thread(daemon=True, target=asyncio.run,
                     args=(wait_forever(),)).start()
    ready.wait()
    return loop

def syncify(fn):
    def syncfn(*args, **kwds):
        # submit the original coroutine to the event loop
        # and wait for the result
        conc_future = asyncio.run_coroutine_threadsafe(
            fn(*args, **kwds), _loop)
        return conc_future.result()
    syncfn.as_async = fn
    return syncfn

The decorator creates a sync adapter that submits the underlying async function to the event loop and waits for the result. It can be used for websockets like this:

import websockets

_loop = start_asyncio()

@syncify
async def open_websocket(uri):
    ws = await websockets.connect(uri)
    return ws

@syncify
async def send_to_websocket(ws, msg):
    await ws.send(msg)

@syncify
async def recv_from_websocket(ws):
    return await ws.recv()

@syncify
async def start_echo_server(host, port):
    async def _echo(ws, _path):
        msg = await ws.recv()
        await ws.send('echo ' + msg)

    await websockets.serve(_echo, host, port)

All these functions are defined with async def, but thanks to the decorator, they are callable as sync functions. Here is a test:

def test():
    start_echo_server("localhost", 8765)
    ws = open_websocket("ws://localhost:8765")
    send_to_websocket(ws, "hi there")
    assert recv_from_websocket(ws) == "echo hi there"

if __name__ == '__main__':
    test()
    print('ok')

Note that it is not allowed to call one "sincified" function from another simply because it would block the event loop. But that is why the decorator offers an escape hatch, the as_async attribute, which allows you to do something like:

@syncify
async def talk(ws):
    async with aiohttp.get(some_url) as resp:
        data = await resp.data()
        await send_to_websocket.as_async(ws, data)

The downside of this approach is that in Python run_coroutine_threadsafe takes a non-negligible amount of time, although it just needs to submit the job to a running event loop. Last time I measured, it was actually faster to call asyncio.run(x()) than asyncio.run_coroutine_threadsafe(x(), loop), despite the former setting up and tearing down a whole event loop. Still, as you discovered, solutions based on loop.run_until_complete and asyncio.run have the limitation that they cannot nest.

Upvotes: 3

Related Questions