zfj3ub94rf576hc4eegm
zfj3ub94rf576hc4eegm

Reputation: 1273

Best way to limit concurrent http requests in python (no threads)?

I am interested in creating a pool for asynchronous function calls (they will be HTTP requests) however I would like to do everything in a single thread. The reason for this is that spawning multiple threads will waste resources (the thread does nothing but wait for the response).

import asyncio
import aiohttp

import some_library as pool

POOL_LIMIT = 3

urls = ["example.com/28409078",
"example.com/31145880",
"example.com/54622752",
"example.com/48008963",
"example.com/82016326",
"example.com/75587921",
"example.com/2988065",
"example.com/47574087",
"example.com/13478021",
"example.com/46041669"]

def get(url):
  # return some promise here

# now perform the async operations
pool(limit=POOL_LIMIT, urls, get)


Is there a python library which can manage async pools for me? In Node.js it looks like there is a library which does something close to what I'd like to do: https://github.com/rxaviers/async-pool

Upvotes: 2

Views: 1104

Answers (3)

Jordi Reinsma
Jordi Reinsma

Reputation: 183

The answer from PaxPrz is perfect, but I managed to prettify it a little more, if anyone's interested

import asyncio
from typing import Any, Coroutine, TypeVar

T = TypeVar("T")


async def pool(tasks: list[Coroutine[Any, Any, T]], size=3):
    result: list[T] = []
    pending = [asyncio.create_task(task) for task in tasks[:size]]
    tasks = tasks[size:]
    while pending:
        (done, pending) = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
        while tasks and len(pending) < size:
            pending.add(asyncio.create_task(tasks.pop(0)))
        result.extend([task.result() for task in done])
    return result

Upvotes: 0

PaxPrz
PaxPrz

Reputation: 1928

Here I've Implemented a pool using basic asyncio functions.

WORKING:

  • pool starts with maxsize tasks
  • when first task completes, it adds next task to queue and prints its result
  • similarly for each single task completes, it adds another tasks until maxsize

Code:

import asyncio

async def pool(tasks, maxsize=3):
    pending = [asyncio.create_task(tasks.pop(0)) for _ in range(maxsize) if tasks]
    while pending:
        (done, pending) = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
        while True:
             if (not tasks) or (len(pending) >= maxsize):
                  break
             pending.add(asyncio.create_task(tasks.pop(0)))
        for task in done:
             print(task.result())
    print("POOL COMPLETED")

For an example you can create tasks and pool like here:

async def work(index, sleep_time):
    await asyncio.sleep(sleep_time)
    return f"task {index} done"

tasks = [work(i, 1) for i in range(10)]

Now to run the task call the asyncio.run

asyncio.run(pool(tasks, 3))

This will only run 3 tasks in parallel

Upvotes: 3

pguardiario
pguardiario

Reputation: 54984

I don't know if there's a popular lobrary for this. Here's a straightforward way to do that:

async def get(url):
  # return some promise here

async def processQueue():
  while len(urls):
    url = urls.pop()
    await get(url)

async def main():
  await asyncio.gather(
    processQueue(),
    processQueue(),
    processQueue()
  )

asyncio.run(main())

You might need a lock before the pop(), I'm not sure.

Upvotes: 0

Related Questions