Startec
Startec

Reputation: 13206

How to convert a function in a third party library to be async?

I am using my Raspberry Pi and the pigpio and websockets libraries.

I want my program to run asynchronously (i.e. I will use async def main as the entry point).

The pigpio library expects a synchronous callback function to be called in response to events, which is fine, but from within that callback I want to call another, asynchronous function from the websocket library.

So it would look like:

def sync_cb(): # <- This can not be made async, therefore I can not use await
   [ws.send('test') for ws in connected_ws] # <- This is async and has to be awaited

Currently I can get it to work with:

def sync_cb():
    asyncio.run(asyncio.wait([ws.send('test') for ws in connected_ws]))

but the docs say this use of asyncio.run is discouraged.

So my synchronous callback needs to call ws.send (also from a third party library) which is async from a function that is synchronous.

Another option that works is:

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(asyncio.gather(*[ws.send(json.dumps(message)) for ws in connected_ws]))

But the three lines of creating and setting an even loop sounds like a lot just to run a simple async function.

My questions are:

Upvotes: 8

Views: 2764

Answers (2)

Artemij Rodionov
Artemij Rodionov

Reputation: 1826

Is it possible to substitute an async function where a synchronous callback is required

It is possible. You can run event loop in separate thread and emit async code there, but you have to consider GIL.

import asyncio
import threading

class Portal:

    def __init__(self, stop_event):
        self.loop = asyncio.get_event_loop()
        self.stop_event = stop_event

    async def _call(self, fn, args, kwargs):
        return await fn(*args, **kwargs)

    async def _stop(self):
        self.stop_event.set()

    def call(self, fn, *args, **kwargs):
        return asyncio.run_coroutine_threadsafe(self._call(fn, args, kwargs), self.loop)

    def stop(self):
        return self.call(self._stop)

def create_portal():
    portal = None

    async def wait_stop():
        nonlocal portal
        stop_event = asyncio.Event()
        portal = Portal(stop_event)
        running_event.set()
        await stop_event.wait()

    def run():
        asyncio.run(wait_stop())

    running_event = threading.Event()
    thread = threading.Thread(target=run)
    thread.start()
    running_event.wait()

    return portal

Usage example:

async def test(msg):
    await asyncio.sleep(0.5)
    print(msg)
    return "HELLO " + msg

# it'll run a new event loop in separate thread
portal = create_portal()
# it'll call `test` in the separate thread and return a Future 
print(portal.call(test, "WORLD").result())
portal.stop().result()

In your case:

def sync_cb():
    calls = [portal.call(ws.send, 'test') for ws in connected_ws]
    # if you want to get results from these calls:
    # [c.result() for c in calls]

And, what kind of overhead am I incurring by using asyncio.run and asyncio.wait just to call a simple async method

asyncio.run will create a new event loop and close it then. Most likely if the callback is not called often it won't be a problem. But if you will use asyncio.run in another callback too, then they won't be able to work concurrently.

Upvotes: 0

alex_noname
alex_noname

Reputation: 32083

You could use run_coroutine_threadsafe function returning concurrent.furures.Future, which can be waited synchronously, to wrap coroutine to regular function and call it from synchronous code.

As I understand it, this approach is more appropriate if sync code (of third party lib) is executed in separate thread, but it can be adapted to single-threaded execution with some modifications.

An example to illustrate the approach:

import asyncio


def async_to_sync(loop, foo):
    def foo_(*args, **kwargs):
        return asyncio.run_coroutine_threadsafe(foo(*args, **kwargs), loop).result()  
    return foo_


def sync_code(cb):
    for i in range(10):
        cb(i)


async def async_cb(a):
    print("async callback:", a)


async def main():
    loop = asyncio.get_event_loop()
    await loop.run_in_executor(None, sync_code, async_to_sync(loop, async_cb))

asyncio.run(main())

Output:

async callback: 0
async callback: 1
async callback: 2
...

Upvotes: 1

Related Questions