Reputation: 32111
I'm submitting coroutines to an event loop in a separate thread. This all works well when I wait on each future in sequence with future.next()
. But I want to now wait on the first completed future in a list of futures. I'm trying to use asyncio.wait(...)
for that, but I appear to be using it incorrectly.
Below is a simplified example. I'm getting the exception TypeError: An asyncio.Future, a coroutine or an awaitable is required
at the line done, pending = future.result()
.
This works if I pass [c1, c2, c3]
to asyncio.wait([c1, c2, c3], return_when=asyncio.FIRST_COMPLETE)
, but I am submitting tasks at random times, so I only can gather the set of futures, not the original tasks. And the documentation clearly states that you can use futures.
coroutine asyncio.wait(futures, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
Wait for the Futures and coroutine objects given by the sequence futures to complete. Coroutines will be wrapped in Tasks. Returns two sets of Future: (done, pending).
import asyncio
import threading
async def generate():
await asyncio.sleep(10)
return 'Hello'
def run_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
event_loop = asyncio.get_event_loop()
threading.Thread(target=lambda: run_loop(event_loop)).start()
c1 = generate() # submitted at a random time
c2 = generate() # submitted at a random time
c3 = generate() # submitted at a random time
f1 = asyncio.run_coroutine_threadsafe(c1, event_loop)
f2 = asyncio.run_coroutine_threadsafe(c2, event_loop)
f3 = asyncio.run_coroutine_threadsafe(c3, event_loop)
all_futures = [f1, f2, f3]
# I'm doing something wrong in these 3 lines
waitable = asyncio.wait(all_futures, return_when=asyncio.FIRST_COMPLETED)
future = asyncio.run_coroutine_threadsafe(waitable, event_loop)
done, pending = future.result() # This returns my TypeError exception
for d in done:
print(d.result())
Upvotes: 1
Views: 5236
Reputation: 32111
This answer was instrumental in helping answer this:
Create generator that yields coroutine results as the coroutines finish
asyncio.wait(...)
can't take futures, only coroutines and awaitables that have NOT yet been scheduled. The correct way to do this is with a callback. When the future finishes it can just add itself to a thread safe queue and you can pull from that queue. The example below fixes the problem in the question:
import asyncio
import threading
import queue
import random
async def generate(i):
await asyncio.sleep(random.randint(5, 10))
return 'Hello {}'.format(i)
def run_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
def done(fut):
q.put(fut)
event_loop = asyncio.get_event_loop()
threading.Thread(target=lambda: run_loop(event_loop)).start()
q = queue.Queue()
c1 = generate(1)
c2 = generate(2)
c3 = generate(3)
f1 = asyncio.run_coroutine_threadsafe(c1, event_loop)
f2 = asyncio.run_coroutine_threadsafe(c2, event_loop)
f3 = asyncio.run_coroutine_threadsafe(c3, event_loop)
f1.add_done_callback(lambda fut: q.put(fut))
f2.add_done_callback(lambda fut: q.put(fut))
f3.add_done_callback(lambda fut: q.put(fut))
print(q.get().result())
print(q.get().result())
print(q.get().result())
Upvotes: -1
Reputation: 155610
asyncio.wait
expects asyncio futures and works inside an event loop. To wait for multiple concurrent.futures
futures (and outside of an event loop), use concurrent.futures.wait
instead:
done, pending = concurrent.futures.wait(
all_futures, return_when=concurrent.futures.FIRST_COMPLETED)
Note that your idea would have worked if you had access to the underlying asyncio futures. For example (untested):
async def submit(coro):
# submit the coroutine and return the asyncio task (future)
return asyncio.create_task(coro)
# ...generate() as before
# note that we use result() to get to the asyncio futures:
f1 = asyncio.run_coroutine_threadsafe(submit(c1), event_loop).result()
f2 = asyncio.run_coroutine_threadsafe(submit(c2), event_loop).result()
f3 = asyncio.run_coroutine_threadsafe(submit(c3), event_loop).result()
# these should be waitable by submitting wait() to the event loop
done, pending = asyncio.run_coroutine_threadsafe(
asyncio.wait([f1, f2, f3], return_when=asyncio.FIRST_COMPLETED)).result()
Upvotes: 3