Reputation: 1297
So I'm locked to a python 3.6.2 interpreter that follows my desktop application.
What I want is to call an async function from a synchronized method or function.
When calling the python function from the desktop application it has to be a normal function which can not be awaited.
From the desktop application I am able to send a list of urls, and what I want is to send back response from every url in an async matter.
here is my try I've marked the SyntaxError which I don't know how to bypass.
import fmeobjects
import asyncio
import aiohttp
import async_timeout
logger = fmeobjects.FMELogFile()
timeout = 10
class FeatureProcessor(object):
def __init__(self):
pass
def input(self, feature):
urls_and_coords = zip(feature.getAttribute('_list{}._wms'),\
feature.getAttribute('_list{}._xmin'),\
feature.getAttribute('_list{}._ymin'),\
feature.getAttribute('_list{}._xmax'),\
feature.getAttribute('_list{}._ymax'))
-> SyntaxError: newfeature = await main(urls_and_coords)
self.pyoutput(newfeature)
def close(self):
pass
async def main(urls):
loop = asyncio.get_event_loop()
async with aiohttp.ClientSession(loop=loop) as session:
feature = loop.run_until_complete(fetch_all(session, urls, loop))
return feature
async def fetch_all(session, urls, loop):
results = await asyncio.gather(*[loop.create_task(fetch(session, url)) for url in urls])
return results
async def fetch(session, url):
with async_timeout.timeout(10):
async with session.get(url[0]) as response:
newFeature = fmeobjects.FMEFeature()
response_data = await response
newFeature.setAttribute('response', response_data)
newFeature.setAttribute('_xmin',url[1])
newFeature.setAttribute('_xmax',url[2])
newFeature.setAttribute('_ymin',url[3])
newFeature.setAttribute('_ymax',url[4])
return newFeature
I have tried making these changes:
import fme
import fmeobjects
import asyncio
import aiohttp
import async_timeout
logger = fmeobjects.FMELogFile()
class FeatureProcessor(object):
def __init__(self):
pass
def input(self, feature):
urls_and_coords = zip(feature.getAttribute('_list{}._wms'),\
feature.getAttribute('_list{}._xmin'),\
feature.getAttribute('_list{}._ymin'),\
feature.getAttribute('_list{}._xmax'),\
feature.getAttribute('_list{}._ymax'))
loop = asyncio.get_event_loop()
result = loop.run_until_complete(main(loop, urls_and_coords))
#feature.setAttribute('result',result)
self.pyoutput(feature)
def close(self):
pass
async def main(loop, urls):
async with aiohttp.ClientSession(loop=loop) as session:
return await fetch_all(session, urls, loop)
async def fetch_all(session, urls, loop):
results = await asyncio.gather(*[loop.create_task(fetch(session, url)) for url in urls])
return results
async def fetch(session, url):
with async_timeout.timeout(10):
async with session.get(url[0]) as response:
#newFeature = fmeobjects.FMEFeature()
response = await response
#newFeature.setAttribute('response', response_data)
#newFeature.setAttribute('_xmin',url[1])
#newFeature.setAttribute('_xmax',url[2])
#newFeature.setAttribute('_ymin',url[3])
#newFeature.setAttribute('_ymax',url[4])
return response, url[1], url[2], url[3], url[4]
but now I end up with this error:
Python Exception <TypeError>: object ClientResponse can't be used in 'await'
expression
Traceback (most recent call last):
File "<string>", line 20, in input
File "asyncio\base_events.py", line 467, in run_until_complete
File "<string>", line 29, in main
File "<string>", line 33, in fetch_all
File "<string>", line 41, in fetch
TypeError: object ClientResponse can't be used in 'await' expression
Upvotes: 89
Views: 127073
Reputation: 81
I used the following code to call async functions using ros1 callbacks.
Wrap the async functions in a decorator with context to the "loop" variable and executing "run_coroutine_threadsafe".
The wrapped functions can then be used directly inside the ros1 sub callback.
This method creates a separate thread to run the async loop.
rospy.spin() and other blocking calls would prevent the async loop from functioning otherwise.
from contextlib import contextmanager
from asyncio import AbstractEventLoop as Loop
import asyncio
from threading import Thread
import time
@contextmanager
def async_context():
"""
Creats an async loop on a seperate thread
Run tasks on async loop from other threads with:
asyncio.run_coroutine_threadsafe(async_function(*arg, **kwargs), loop)
"""
def start_event_loop(loop: Loop):
asyncio.set_event_loop(loop)
loop.run_forever()
try:
loop = asyncio.new_event_loop()
loop_thread = Thread(target=start_event_loop, args=(loop,), daemon=True)
loop_thread.start()
yield loop
finally:
loop.call_soon_threadsafe(loop.stop)
loop_thread.join()
async def example_async_function(time: float):
print("async sleep: started")
await asyncio.sleep(time)
print("async sleep: done")
def main():
with async_context() as loop:
asyncio.run_coroutine_threadsafe(example_async_function(2), loop)
print("sync sleep: started")
time.sleep(5)
print("sync sleep: done")
if __name__ == "__main__":
main()
Upvotes: 0
Reputation: 9853
I was able to get this working in pure python 3.10 using the built-in asyncio.run_coroutine_threadsafe
.
This is new to me, so there are probably some caveats, e.g. since the async method is not actually awaited, the process could (will) exit before the callback completes (unless you do something to ensure it doesn't).
For a reference on where this might occur in the wild, see the bleak
BLE library class BleakClient
callback method disconnected_callback
. Then, in the callback try to emit
using the async version of socket.io
client, AsyncClient
.
Concise problem/solution:
import asyncio
from typing import Callable
Callback = Callable[[int], None]
class SomeSystem:
"""Some library you don't control that is mostly async, but provides a callback that
is _not_ async."""
def __init__(self, callback: Callback):
self._callback = callback
async def do_something(self):
"""do some work and then call the non-async callback"""
await asyncio.sleep(1.0)
self._callback(1)
await asyncio.sleep(1.0)
self._callback(2)
async def some_async_method(value: int):
"""some long-running operation normally called by async code"""
await asyncio.sleep(0.1)
print(f"long-running: {value}")
async def main():
"""main is async and started as normal with asyncio.run"""
print("BEGIN main")
loop = asyncio.get_running_loop()
def cb(value: int) -> None:
"""This method _cannot_ be async, due to the underlying implementation of SomeSystem."""
# some_async_method(value) # RuntimeWarning: coroutine 'some_async_method' was never awaited
asyncio.run_coroutine_threadsafe(some_async_method(value), loop) # okay
system = SomeSystem(cb)
await system.do_something()
# maybe ensure the last call to async method is awaited? Without this call, the final callback
# won't be handled, since it's never being awaited. If anyone knows how to properly wait
# for this, let me know in the comments!
await asyncio.sleep(1.0)
print("END main")
if __name__ == "__main__":
asyncio.run(main())
Output
BEGIN main
long-running: 1
long-running: 2
END main
Upvotes: 5
Reputation: 3345
There are also some libraries that exist to handle this and always do the right thing. One example is asgiref.sync
described here which has methods async_to_sync
and sync_to_async
for performing these conversions:
from asgiref.sync import async_to_sync
@async_to_sync
async def print_data():
print(await get_data())
print_data() # Can be called synchronously
More info from the docs for asgiref.sync
:
AsyncToSync lets a synchronous subthread stop and wait while the async function is called on the main thread's event loop, and then control is returned to the thread when the async function is finished.
SyncToAsync lets async code call a synchronous function, which is run in a threadpool and control returned to the async coroutine when the synchronous function completes.
There are also other similar projects like koil
Upvotes: 23
Reputation: 3647
@deceze answer is probably the best you can do in Python 3.6.
But in Python 3.7, you could directly use asyncio.run
in the following way:
newfeature = asyncio.run(main(urls))
It will properly create, handle, and close an event_loop
.
Upvotes: 81
Reputation: 522382
You would use an event loop to execute the asynchronous function to completion:
newfeature = asyncio.get_event_loop().run_until_complete(main(urls_and_coords))
(This technique is already used inside main
. And I'm not sure why, since main
is async
you could/should use await fetch_all(...)
there.)
Upvotes: 36