Reputation: 41
I'd like to use asyncio to do a lot of simultaneous non-blocking IO in Python. However, I want that use of asyncio to be abstracted away from the user--under the hood there's a lot of asychronous calls going on simultaneously to speed things up, but for the user there's a single, synchronous call.
Basically something like this:
async def _slow_async_fn(address):
data = await async_load_data(address)
return data
def synchronous_blocking_io()
addresses = ...
tasks = []
for address in addresses:
tasks.append(_slow_async_fn(address))
all_results = some_fn(asyncio.gather(*tasks))
return all_results
The problem is, how can I achieve this in a way that's agnostic to the user's running environment? I use a pattern like asyncio.get_event_loop().run_until_complete()
, I run into issues if the code is being called inside an environment like Jupyter where there's already an event loop running. Is there a way to robustly gather the results of a set of asynchronous tasks that doesn't require pushing async/await
statements all the way up the program?
Upvotes: 0
Views: 125
Reputation: 106
The restriction on running loops is per thread, so running a new event loop is possible, as long as it is in a new thread.
import asyncio
import concurrent.futures
async def gatherer_of(tasks):
# It's necessary to wrap asyncio.gather() in a coroutine (reasons beyond scope)
return await asyncio.gather(*tasks)
def synchronous_blocking_io():
addresses = ...
tasks = []
for address in addresses:
tasks.append(_slow_async_fn(address))
loop = asyncio.new_event_loop()
return loop.run_until_complete(gatherer_of(tasks))
def synchronous_blocking_io_wrapper():
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
fut = executor.submit(synchronous_blocking_io)
return fut.result()
# Testing
async def async_runner():
# Simulating execution from a running loop
return synchronous_blocking_io_wrapper()
# Run from synchronous client
# print(synchronous_blocking_io_wrapper())
# Run from async client
# print(asyncio.run(async_runner()))
The same result can be achieved with the ProcessPoolExecutor
, by manually running synchronous_blocking_io
in a new thread and join
ing it, starting an entirely new process and so forth. As long as you are not in the same thread, you won't conflict with any running event loop.
Upvotes: 1