Reputation: 33223
I currently have a for loop as follows
async def process(tasks, num):
count = 0
results = []
for task in tasks:
if count >= num:
break
result = await some_async_task(task)
if result == 'foo':
continue
results.append(result)
count+=1
I was wondering if I can use gather or wait_for primitive here. But I am not sure how to implement these if logics in there? Like.. I dont want to unneccessary await a task if count>=num. If there are 20 tasks and num = 4, then I dont want to run all 20 tasks.
Upvotes: 2
Views: 1789
Reputation: 14360
Well, I really think this question and the selected answer are worth an analysis.
First off, you have to be aware of the fact that not awaiting other tasks depends on the result of the already awaited ones. Meaning, you have to potentially await all your tasks in order to get the first N results you are interested in.
Having said that, the selected answer has the following characteristics:
num
the use of asyncio makes less sense.num
the solution would be better written using asyncio.as_completed so you can get the N first result matching your condition.Think about this case, you want the first result (only one) matching some condition out of 1000 requests! In this case, the selected answer is not best than a synchronous implementation, in fact, it is worst since it presents unnecessary complications.
With the selected solution this will be always the case when
len(results) = N - 1
, since this will causenum = 1
, making the whole thing conceptually synchronous duenext_batch
having always only 1 element!!
Another thing, using the selected answer does not prevent "unnecessary awaits", for instance, if you want 100 solutions out of 300 coroutines you might easily end up running the 3rd 100-batch running all previous "not wanted coroutines/tasks"!
If no running extra awaits it is a must (maybe saving money on a very expensive query to a DB or even to an AWS Lambda request), you might want to sacrifice speed and run synchronously (no asyncio).
On the other hand, if you are only interested in the first N results asyncio.as_completed
is the way to go.
Here is an example ...
import asyncio
import random
import time
# Max time a corroutine gets the dalay for.
max_time_waited = 0
# Couter for the number of coroutines called.
call_counter = 0
# Aux variable for avoiding race conditions when editting global variables.
# no need for this in you code, it is just for ilustrative purposes.
lock = asyncio.Lock()
# Some task.
async def coro(task_number):
global max_time_waited
global call_counter
delay = random.randint(1, 10)
await lock.acquire()
max_time_waited = max(delay, max_time_waited)
lock.release()
call_counter += 1
await asyncio.sleep(delay)
return "foo" if task_number % 2 else "bar"
async def process(tasks, num):
interesting_results = []
for task in asyncio.as_completed(tasks):
result = await task
if result == "foo":
interesting_results.append(result)
if len(interesting_results) >= num:
break
return interesting_results
async def main(number):
tasks = [
coro(task_number)
for task_number in range(100)
]
return await process(tasks, number)
init = time.perf_counter()
result = asyncio.run(main(4))
print(f"Result obtained in {time.perf_counter() - init} seconds")
print(f"Possible max time waiting: {max_time_waited} seconds")
print(f"Total coroutine calls: {call_counter}")
print(result)
Result obtained in 1.009999111003708 seconds
Possible max time waiting: 10 seconds
Total coroutine calls: 100
['foo', 'foo', 'foo', 'foo']
Upvotes: 0
Reputation: 13415
This can easily be achieved using the aiostream library. Here is a working example:
import asyncio
from random import random
from aiostream import stream, pipe
async def some_async_task(i):
await asyncio.sleep(random())
return i if random() < 0.2 else None
async def process(task_args, n):
return await (
stream.iterate(task_args)
| pipe.map(some_async_task, task_limit=n)
| pipe.filter(bool)
| pipe.take(n)
| pipe.list()
)
async def main():
print(await process(task_args=range(100), n=10))
if __name__ == "__main__":
asyncio.run(main())
The program prints the list of the first 10 tasks that succeeded:
[1, 8, 16, 18, 19, 37, 42, 43, 45, 47]
Also notice that you may tune the number of some_async_task
that can run at the same time using the task_limit
argument.
Disclaimer: I am the project maintainer.
Upvotes: 3
Reputation: 154911
You could process the tasks in batches of size that equals the number of results you still need. If you give such batch to asyncio.gather()
, it will both run them in parallel and preserve the order of results. For example:
async def process(tasks, num):
results = []
task_iter = iter(tasks)
while len(results) < num:
next_batch = tuple(itertools.islice(task_iter, num - len(results)))
if len(next_batch) == 0:
break
batch_results = await asyncio.gather(*next_batch)
results.extend(r for r in batch_results if r == 'foo')
Upvotes: 2