Paal Pedersen
Paal Pedersen

Reputation: 1297

How to call a async function from a synchronized code Python

So I'm locked to a python 3.6.2 interpreter that follows my desktop application.

What I want is to call an async function from a synchronized method or function.

When calling the python function from the desktop application it has to be a normal function which can not be awaited.

From the desktop application I am able to send a list of urls, and what I want is to send back response from every url in an async matter.

here is my try I've marked the SyntaxError which I don't know how to bypass.

import fmeobjects
import asyncio
import aiohttp
import async_timeout
logger = fmeobjects.FMELogFile()
timeout = 10

class FeatureProcessor(object):
    def __init__(self):
        pass
    def input(self, feature):
        urls_and_coords = zip(feature.getAttribute('_list{}._wms'),\
        feature.getAttribute('_list{}._xmin'),\
        feature.getAttribute('_list{}._ymin'),\
        feature.getAttribute('_list{}._xmax'),\
        feature.getAttribute('_list{}._ymax'))
        -> SyntaxError: newfeature = await main(urls_and_coords)
        self.pyoutput(newfeature)
        
    def close(self):
       pass 

async def main(urls):
    loop = asyncio.get_event_loop()
    async with aiohttp.ClientSession(loop=loop) as session:
        feature = loop.run_until_complete(fetch_all(session, urls, loop))
        return feature
        
async def fetch_all(session, urls, loop):
    results = await asyncio.gather(*[loop.create_task(fetch(session, url)) for url in urls])
    return results
    

async def fetch(session, url):
    with async_timeout.timeout(10):
        async with session.get(url[0]) as response:
            newFeature = fmeobjects.FMEFeature()
            response_data = await response
            newFeature.setAttribute('response', response_data)
            newFeature.setAttribute('_xmin',url[1])
            newFeature.setAttribute('_xmax',url[2])
            newFeature.setAttribute('_ymin',url[3])
            newFeature.setAttribute('_ymax',url[4])
            return newFeature

I have tried making these changes:

import fme
import fmeobjects
import asyncio
import aiohttp
import async_timeout
logger = fmeobjects.FMELogFile()

class FeatureProcessor(object):
    def __init__(self):
        pass
    def input(self, feature):
        urls_and_coords = zip(feature.getAttribute('_list{}._wms'),\
        feature.getAttribute('_list{}._xmin'),\
        feature.getAttribute('_list{}._ymin'),\
        feature.getAttribute('_list{}._xmax'),\
        feature.getAttribute('_list{}._ymax'))
        loop = asyncio.get_event_loop()
        result = loop.run_until_complete(main(loop, urls_and_coords))
        #feature.setAttribute('result',result)
        self.pyoutput(feature)
        
    def close(self):
       pass 

async def main(loop, urls):
    async with aiohttp.ClientSession(loop=loop) as session:
        return await fetch_all(session, urls, loop)

        
async def fetch_all(session, urls, loop):
    results = await asyncio.gather(*[loop.create_task(fetch(session, url)) for url in urls])
    return results
    

async def fetch(session, url):
    with async_timeout.timeout(10):
        async with session.get(url[0]) as response:
            #newFeature = fmeobjects.FMEFeature()
            response = await response
            #newFeature.setAttribute('response', response_data)
            #newFeature.setAttribute('_xmin',url[1])
            #newFeature.setAttribute('_xmax',url[2])
            #newFeature.setAttribute('_ymin',url[3])
            #newFeature.setAttribute('_ymax',url[4])
            return response, url[1], url[2], url[3], url[4]


        

but now I end up with this error:

Python Exception <TypeError>: object ClientResponse can't be used in 'await' 
expression
Traceback (most recent call last):
  File "<string>", line 20, in input
  File "asyncio\base_events.py", line 467, in run_until_complete
  File "<string>", line 29, in main
  File "<string>", line 33, in fetch_all
  File "<string>", line 41, in fetch
TypeError: object ClientResponse can't be used in 'await' expression

Upvotes: 89

Views: 127073

Answers (5)

Mountain
Mountain

Reputation: 81

I used the following code to call async functions using ros1 callbacks.
Wrap the async functions in a decorator with context to the "loop" variable and executing "run_coroutine_threadsafe".

The wrapped functions can then be used directly inside the ros1 sub callback.

This method creates a separate thread to run the async loop.
rospy.spin() and other blocking calls would prevent the async loop from functioning otherwise.

from contextlib import contextmanager
from asyncio import AbstractEventLoop as Loop
import asyncio
from threading import Thread
import time


@contextmanager
def async_context():
    """
    Creats an async loop on a seperate thread
    Run tasks on async loop from other threads with:
        asyncio.run_coroutine_threadsafe(async_function(*arg, **kwargs), loop)
    """

    def start_event_loop(loop: Loop):
        asyncio.set_event_loop(loop)
        loop.run_forever()

    try:
        loop = asyncio.new_event_loop()
        loop_thread = Thread(target=start_event_loop, args=(loop,), daemon=True)
        loop_thread.start()
        yield loop
    finally:
        loop.call_soon_threadsafe(loop.stop)
        loop_thread.join()


async def example_async_function(time: float):
    print("async sleep: started")
    await asyncio.sleep(time)
    print("async sleep: done")


def main():
    with async_context() as loop:
        asyncio.run_coroutine_threadsafe(example_async_function(2), loop)
        print("sync sleep: started")
        time.sleep(5)
        print("sync sleep: done")


if __name__ == "__main__":
    main()

Upvotes: 0

cod3monk3y
cod3monk3y

Reputation: 9853

I was able to get this working in pure python 3.10 using the built-in asyncio.run_coroutine_threadsafe.

This is new to me, so there are probably some caveats, e.g. since the async method is not actually awaited, the process could (will) exit before the callback completes (unless you do something to ensure it doesn't).

For a reference on where this might occur in the wild, see the bleak BLE library class BleakClient callback method disconnected_callback. Then, in the callback try to emit using the async version of socket.io client, AsyncClient.

Concise problem/solution:

import asyncio
from typing import Callable

Callback = Callable[[int], None]


class SomeSystem:
    """Some library you don't control that is mostly async, but provides a callback that
    is _not_ async."""

    def __init__(self, callback: Callback):
        self._callback = callback

    async def do_something(self):
        """do some work and then call the non-async callback"""
        await asyncio.sleep(1.0)
        self._callback(1)
        await asyncio.sleep(1.0)
        self._callback(2)


async def some_async_method(value: int):
    """some long-running operation normally called by async code"""
    await asyncio.sleep(0.1)
    print(f"long-running: {value}")


async def main():
    """main is async and started as normal with asyncio.run"""
    print("BEGIN main")

    loop = asyncio.get_running_loop()

    def cb(value: int) -> None:
        """This method _cannot_ be async, due to the underlying implementation of SomeSystem."""
        # some_async_method(value)  # RuntimeWarning: coroutine 'some_async_method' was never awaited
        asyncio.run_coroutine_threadsafe(some_async_method(value), loop)  # okay

    system = SomeSystem(cb)
    await system.do_something()

    # maybe ensure the last call to async method is awaited? Without this call, the final callback
    # won't be handled, since it's never being awaited. If anyone knows how to properly wait
    # for this, let me know in the comments!
    await asyncio.sleep(1.0)

    print("END main")


if __name__ == "__main__":
    asyncio.run(main())

Output

BEGIN main
long-running: 1
long-running: 2
END main

Upvotes: 5

Matthew D. Scholefield
Matthew D. Scholefield

Reputation: 3345

There are also some libraries that exist to handle this and always do the right thing. One example is asgiref.sync described here which has methods async_to_sync and sync_to_async for performing these conversions:

from asgiref.sync import async_to_sync

@async_to_sync
async def print_data():
    print(await get_data())

print_data()  # Can be called synchronously

More info from the docs for asgiref.sync:

AsyncToSync lets a synchronous subthread stop and wait while the async function is called on the main thread's event loop, and then control is returned to the thread when the async function is finished.

SyncToAsync lets async code call a synchronous function, which is run in a threadpool and control returned to the async coroutine when the synchronous function completes.

There are also other similar projects like koil

Upvotes: 23

Francis Colas
Francis Colas

Reputation: 3647

@deceze answer is probably the best you can do in Python 3.6. But in Python 3.7, you could directly use asyncio.run in the following way:

newfeature = asyncio.run(main(urls))

It will properly create, handle, and close an event_loop.

Upvotes: 81

deceze
deceze

Reputation: 522382

You would use an event loop to execute the asynchronous function to completion:

newfeature = asyncio.get_event_loop().run_until_complete(main(urls_and_coords))

(This technique is already used inside main. And I'm not sure why, since main is async you could/should use await fetch_all(...) there.)

Upvotes: 36

Related Questions