Reputation: 12099
The asyncio.Futures documentations states:
Future objects are used to bridge low-level callback-based code with high-level async/await code.
Is there a canonical example of how this is done?
To make the example more concrete, suppose we want to wrap the following function, which is typical of a callback-based API. To be clear: this function cannot be modified - pretend it is some complex third party library (that probably uses threads internally that we can't control) that wants a callback.
import threading
import time
def callback_after_delay(secs, callback, *args):
"""Call callback(*args) after sleeping for secs seconds"""
def _target():
time.sleep(secs)
callback(*args)
thread = threading.Thread(target=_target)
thread.start()
We would like to be able to use our wrapper function like:
async def main():
await aio_callback_after_delay(10., print, "Hello, World")
Upvotes: 2
Views: 512
Reputation: 1437
Just use a ThreadPoolExecutor. The code doesn't change except how you kick off the thread. If you remove "return_exceptions" from the gather call you'll see the exception with full traceback printed so its up to you what you want.
import time,random
from concurrent.futures import ThreadPoolExecutor
import asyncio
def cb():
print("cb called")
def blocking():
if random.randint(0,3) == 1:
raise ValueError("Random Exception!")
time.sleep(1)
cb()
return 5
async def run(loop):
futs = []
executor = ThreadPoolExecutor(max_workers=5)
for x in range(5):
future = loop.run_in_executor(executor, blocking)
futs.append( future )
res = await asyncio.gather( *futs, return_exceptions=True )
for r in res:
if isinstance(r, Exception):
print("Exception:",r)
loop = asyncio.get_event_loop()
loop.run_until_complete( run(loop) )
loop.close()
Output
cb called
cb called
cb called
Exception: Random Exception!
Exception: Random Exception!
Upvotes: 0
Reputation: 12099
Below is a complete self-contained example to demonstrate one approach. It takes care of running the callback on the asyncio thread, and handles exceptions raised from the callback.
Works in python 3.6.6. I wonder about using asyncio.get_event_loop()
here. We need a loop as loop.create_future()
is the preferred way to create futures in asyncio. However, in 3.7 we should prefer asyncio.get_running_loop()
which would raise an exception if a loop had not yet been set. Perhaps the best approach is to pass the loop in to aio_callback_after_delay
explicitly - but this does not match existing asyncio
code which often makes the loop an optional keyword argument. Clarification on this point, or any other improvements would be appreciated!
import asyncio
import threading
import time
# This is the callback code we are trying to bridge
def callback_after_delay(secs, callback, *args):
"""Call callback(*args) after sleeping for secs seconds"""
def _target():
time.sleep(secs)
callback(*args)
thread = threading.Thread(target=_target)
thread.start()
# This is our wrapper
async def aio_callback_after_delay(secs, callback, *args):
loop = asyncio.get_event_loop()
f = loop.create_future()
def _inner():
try:
f.set_result(callback(*args))
except Exception as ex:
f.set_exception(ex)
callback_after_delay(secs, loop.call_soon_threadsafe, _inner)
return await f
#
# Below is test code to demonstrate things work
#
async def test_aio_callback_after_delay():
print('Before!')
await aio_callback_after_delay(1., print, "Hello, World!")
print('After!')
async def test_aio_callback_after_delay_exception():
def callback():
raise RuntimeError()
print('Before!')
await aio_callback_after_delay(1., callback)
print('After!')
if __name__ == '__main__':
loop = asyncio.get_event_loop()
# Basic test
print('Basic Test')
loop.run_until_complete(test_aio_callback_after_delay())
# Test our implementation is truly async
print('Truly Async!')
loop.run_until_complete(
asyncio.gather(
*(test_aio_callback_after_delay() for i in range(0,5))
)
)
# Test exception handling
print('Exception Handling')
loop.run_until_complete(test_aio_callback_after_delay_exception())
The output is something like:
Basic Test
Before!
Hello, World
After!
Truly Async!
Before!
Before!
Before!
Before!
Before!
Hello, World
Hello, World
Hello, World
Hello, World
Hello, World
After!
After!
After!
After!
After!
Exception Handling
Before!
Traceback (most recent call last):
File "./scratch.py", line 60, in <module>
loop.run_until_complete(test_aio_callback_after_delay_exception())
File "\lib\asyncio\base_events.py", line 468, in run_until_complete
return future.result()
File "./scratch.py", line 40, in test_aio_callback_after_delay_exception
await aio_callback_after_delay(1., callback)
File "./scratch.py", line 26, in aio_callback_after_delay
return await f
File "./scratch.py", line 21, in _inner
f.set_result(callback(*args))
File "./scratch.py", line 37, in callback
raise RuntimeError()
RuntimeError
Upvotes: 2