Reputation: 1273
I am interested in creating a pool for asynchronous function calls (they will be HTTP requests) however I would like to do everything in a single thread. The reason for this is that spawning multiple threads will waste resources (the thread does nothing but wait for the response).
import asyncio
import aiohttp
import some_library as pool
POOL_LIMIT = 3
urls = ["example.com/28409078",
"example.com/31145880",
"example.com/54622752",
"example.com/48008963",
"example.com/82016326",
"example.com/75587921",
"example.com/2988065",
"example.com/47574087",
"example.com/13478021",
"example.com/46041669"]
def get(url):
# return some promise here
# now perform the async operations
pool(limit=POOL_LIMIT, urls, get)
Is there a python library which can manage async pools for me? In Node.js it looks like there is a library which does something close to what I'd like to do: https://github.com/rxaviers/async-pool
Upvotes: 2
Views: 1104
Reputation: 183
The answer from PaxPrz is perfect, but I managed to prettify it a little more, if anyone's interested
import asyncio
from typing import Any, Coroutine, TypeVar
T = TypeVar("T")
async def pool(tasks: list[Coroutine[Any, Any, T]], size=3):
result: list[T] = []
pending = [asyncio.create_task(task) for task in tasks[:size]]
tasks = tasks[size:]
while pending:
(done, pending) = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
while tasks and len(pending) < size:
pending.add(asyncio.create_task(tasks.pop(0)))
result.extend([task.result() for task in done])
return result
Upvotes: 0
Reputation: 1928
Here I've Implemented a pool using basic asyncio
functions.
WORKING:
Code:
import asyncio
async def pool(tasks, maxsize=3):
pending = [asyncio.create_task(tasks.pop(0)) for _ in range(maxsize) if tasks]
while pending:
(done, pending) = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
while True:
if (not tasks) or (len(pending) >= maxsize):
break
pending.add(asyncio.create_task(tasks.pop(0)))
for task in done:
print(task.result())
print("POOL COMPLETED")
For an example you can create tasks and pool like here:
async def work(index, sleep_time):
await asyncio.sleep(sleep_time)
return f"task {index} done"
tasks = [work(i, 1) for i in range(10)]
Now to run the task call the asyncio.run
asyncio.run(pool(tasks, 3))
This will only run 3 tasks in parallel
Upvotes: 3
Reputation: 54984
I don't know if there's a popular lobrary for this. Here's a straightforward way to do that:
async def get(url):
# return some promise here
async def processQueue():
while len(urls):
url = urls.pop()
await get(url)
async def main():
await asyncio.gather(
processQueue(),
processQueue(),
processQueue()
)
asyncio.run(main())
You might need a lock before the pop(), I'm not sure.
Upvotes: 0