Reputation: 13206
I am using my Raspberry Pi and the pigpio
and websockets
libraries.
I want my program to run asynchronously (i.e. I will use async def main
as the entry point).
The pigpio
library expects a synchronous callback function to be called in response to events, which is fine, but from within that callback I want to call another, asynchronous function from the websocket
library.
So it would look like:
def sync_cb(): # <- This can not be made async, therefore I can not use await
[ws.send('test') for ws in connected_ws] # <- This is async and has to be awaited
Currently I can get it to work with:
def sync_cb():
asyncio.run(asyncio.wait([ws.send('test') for ws in connected_ws]))
but the docs say this use of asyncio.run
is discouraged.
So my synchronous callback needs to call ws.send
(also from a third party library) which is async from a function that is synchronous.
Another option that works is:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(asyncio.gather(*[ws.send(json.dumps(message)) for ws in connected_ws]))
But the three lines of creating and setting an even loop sounds like a lot just to run a simple async function.
My questions are:
cb
async in this example)asyncio.run
and asyncio.wait
just to call a simple async method (in the list comprehension)Upvotes: 8
Views: 2764
Reputation: 1826
Is it possible to substitute an async function where a synchronous callback is required
It is possible. You can run event loop in separate thread and emit async
code there, but you have to consider GIL.
import asyncio
import threading
class Portal:
def __init__(self, stop_event):
self.loop = asyncio.get_event_loop()
self.stop_event = stop_event
async def _call(self, fn, args, kwargs):
return await fn(*args, **kwargs)
async def _stop(self):
self.stop_event.set()
def call(self, fn, *args, **kwargs):
return asyncio.run_coroutine_threadsafe(self._call(fn, args, kwargs), self.loop)
def stop(self):
return self.call(self._stop)
def create_portal():
portal = None
async def wait_stop():
nonlocal portal
stop_event = asyncio.Event()
portal = Portal(stop_event)
running_event.set()
await stop_event.wait()
def run():
asyncio.run(wait_stop())
running_event = threading.Event()
thread = threading.Thread(target=run)
thread.start()
running_event.wait()
return portal
Usage example:
async def test(msg):
await asyncio.sleep(0.5)
print(msg)
return "HELLO " + msg
# it'll run a new event loop in separate thread
portal = create_portal()
# it'll call `test` in the separate thread and return a Future
print(portal.call(test, "WORLD").result())
portal.stop().result()
In your case:
def sync_cb():
calls = [portal.call(ws.send, 'test') for ws in connected_ws]
# if you want to get results from these calls:
# [c.result() for c in calls]
And, what kind of overhead am I incurring by using asyncio.run and asyncio.wait just to call a simple async method
asyncio.run
will create a new event loop and close it then. Most likely if the callback is not called often it won't be a problem. But if you will use asyncio.run
in another callback too, then they won't be able to work concurrently.
Upvotes: 0
Reputation: 32083
You could use run_coroutine_threadsafe
function returning concurrent.furures.Future
, which can be waited synchronously, to wrap coroutine to regular function and call it from synchronous code.
As I understand it, this approach is more appropriate if sync code (of third party lib) is executed in separate thread, but it can be adapted to single-threaded execution with some modifications.
An example to illustrate the approach:
import asyncio
def async_to_sync(loop, foo):
def foo_(*args, **kwargs):
return asyncio.run_coroutine_threadsafe(foo(*args, **kwargs), loop).result()
return foo_
def sync_code(cb):
for i in range(10):
cb(i)
async def async_cb(a):
print("async callback:", a)
async def main():
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, sync_code, async_to_sync(loop, async_cb))
asyncio.run(main())
Output:
async callback: 0
async callback: 1
async callback: 2
...
Upvotes: 1