Reputation: 311
I am trying to convert some functions to NOT being async, so i run them in a run_until_complete; but i would like to create a websocket in a NOT async function and then use that websocket from multiple other functions also NOT async. Trying to get the loop and run another function give me an exception that there is already a loop running. Is there a way to do this? Thank you!
Upvotes: 1
Views: 711
Reputation: 155166
Please be aware that what you are attempting goes against the design of asyncio, where the idea is that the whole program (or at least the whole thread that talks to the server) goes async. However, it is still possible to execute async functions as sync, you'll just need to use some tricks.
Your best bet is to create a dedicated thread that runs the event loop, and submit tasks to the event loop using asyncio.run_coroutine_threadsafe()
.
You can even abstract it into a decorator that turns any async method into sync:
import threading, asyncio
def start_asyncio():
loop = None
ready = threading.Event()
async def wait_forever():
nonlocal loop
loop = asyncio.get_event_loop()
ready.set()
await loop.create_future()
threading.Thread(daemon=True, target=asyncio.run,
args=(wait_forever(),)).start()
ready.wait()
return loop
def syncify(fn):
def syncfn(*args, **kwds):
# submit the original coroutine to the event loop
# and wait for the result
conc_future = asyncio.run_coroutine_threadsafe(
fn(*args, **kwds), _loop)
return conc_future.result()
syncfn.as_async = fn
return syncfn
The decorator creates a sync adapter that submits the underlying async function to the event loop and waits for the result. It can be used for websockets like this:
import websockets
_loop = start_asyncio()
@syncify
async def open_websocket(uri):
ws = await websockets.connect(uri)
return ws
@syncify
async def send_to_websocket(ws, msg):
await ws.send(msg)
@syncify
async def recv_from_websocket(ws):
return await ws.recv()
@syncify
async def start_echo_server(host, port):
async def _echo(ws, _path):
msg = await ws.recv()
await ws.send('echo ' + msg)
await websockets.serve(_echo, host, port)
All these functions are defined with async def
, but thanks to the decorator, they are callable as sync functions. Here is a test:
def test():
start_echo_server("localhost", 8765)
ws = open_websocket("ws://localhost:8765")
send_to_websocket(ws, "hi there")
assert recv_from_websocket(ws) == "echo hi there"
if __name__ == '__main__':
test()
print('ok')
Note that it is not allowed to call one "sincified" function from another simply because it would block the event loop. But that is why the decorator offers an escape hatch, the as_async
attribute, which allows you to do something like:
@syncify
async def talk(ws):
async with aiohttp.get(some_url) as resp:
data = await resp.data()
await send_to_websocket.as_async(ws, data)
The downside of this approach is that in Python run_coroutine_threadsafe
takes a non-negligible amount of time, although it just needs to submit the job to a running event loop. Last time I measured, it was actually faster to call asyncio.run(x())
than asyncio.run_coroutine_threadsafe(x(), loop)
, despite the former setting up and tearing down a whole event loop. Still, as you discovered, solutions based on loop.run_until_complete
and asyncio.run
have the limitation that they cannot nest.
Upvotes: 3