user6037143
user6037143

Reputation: 566

Python asyncio - Increase the value of Semaphore

I am making use of aiohttp in one of my projects and would like to limit the number of requests made per second. I am using asyncio.Semaphore to do that. My challenge is I may want to increase/decrease the number of requests allowed per second.

For example:

limit = asyncio.Semaphore(10)
async with limit:
    async with aiohttp.request(...)
        ...
    await asyncio.sleep(1)

This works great. That is, it limits that aiohttp.request to 10 concurrent requests in a second. However, I may want to increase and decrease the Semaphore._value. I can do limit._value = 20 but I am not sure if this is the right approach or there is another way to do that.

Upvotes: 3

Views: 2052

Answers (3)

jeffkmeng
jeffkmeng

Reputation: 879

There actually is a way to do it using only the public API. The underlying technology that powers a semaphore is just a counter tracking the number of free “slots” for some resource (in this case, the number of requests that you can make concurrently).

The idea is that when you want to "use" your resource, you decrement the counter by 1 to mark that you're claiming one of the slots. If the counter is currently 0, there aren't any free slots left, so you can't use the resource yet. Therefore, you wait until the counter becomes non-empty again. Then, you can decrement it by 1 to mark that you're claiming one of the spots. When you're done using your resource, you increment the counter by 1 to mark that you're done with your slot.

When you try to use a semaphore called limit inside with:, the limit.acquire() function is called, which tries to claim one of the spots -- it waits until the counter is greater than zero, and then subtracts one from the counter. When you finish using it, it calls limit.release(), which marks that you're done with a spot by adding one to the counter.

Now that we know how a semaphore works, we can realize that we can also call limit.release manually, and there's nothing stopping us from increasing the counter above what it initially started as.1

For example:

limit = asyncio.Semaphore(10)

for i in range(10):
    limit.release()

# limit now has 20 slots

# ...

# you can also decrement
for i in range(5):
    await limit.acquire()

# limit now has 15 slots

Unfortunately, since you have to call the method once each time you want to increment the counter, if you want to change the limit by large amounts, this won’t be efficient and one of the other answers may still be better. However, know that it is possible with the API.

1 It’s for this very reason that there also exists the BoundedSemaphore, designed to mitigate bugs that might happen from accidentally changing the value of the semaphore.

Upvotes: 1

kakarukeys
kakarukeys

Reputation: 23641

Using workers and queues is a more complex solution, you have to think about issues like setup, teardown, exception handling and backpressure, etc.

Semaphore can be implemented with Lock, if you don't mind abit of inefficiency (you will see why), here's a simple implemention for a dynamic-value semaphore:

class DynamicSemaphore:
    def __init__(self, value=1):
        self._lock = asyncio.Lock()

        if value < 0:
            raise ValueError("Semaphore initial value must be >= 0")

        self.value = value

    async def __aenter__(self):
        await self.acquire()
        return None

    async def __aexit__(self, exc_type, exc, tb):
        self.release()

    def locked(self):
        return self.value == 0

    async def acquire(self):
        async with self._lock:
            while self.value <= 0:
                await asyncio.sleep(0.1)

        self.value -= 1
        return True

    def release(self):
        self.value += 1

Upvotes: 0

user4815162342
user4815162342

Reputation: 155670

Accessing the private _value attribute is not the right approach for at least two reasons: one that the attribute is private and can be removed, renamed, or change meaning in a future version without notice, and the other that increasing the limit won't be noticed by a semaphore that already has waiters.

Since asyncio.Semaphore doesn't support modifying the limit dynamically, you have two options: implementing your own Semaphore class that does support it, or not using a Semaphore at all. The latter is probably easier as you can always replace a semaphore-enforced limit with a fixed number of worker tasks that receive jobs through a queue. Assuming you currently have code that looks like this:

async def fetch(limit, arg):
    async with limit:
        # your actual code here
        return result

async def tweak_limit(limit):
    # here you'd like to be able to increase the limit

async def main():
    limit = asyncio.Semaphore(10)
    asyncio.create_task(tweak_limit(limit))
    results = await asyncio.gather(*[fetch(limit, x) for x in range(1000)])

You could express it without a semaphore by creating workers in advance and giving them work to do:

async def fetch_task(queue, results):
    while True:
        arg = await queue.get()
        # your actual code here
        results.append(result)
        queue.task_done()

async def main():
    # fill the queue with jobs for the workers
    queue = asyncio.Queue()
    for x in range(1000):
        await queue.put(x)

    # create the initial pool of workers
    results = []
    workers = [asyncio.create_task(fetch_task(queue, results))
               for _ in range(10)]
    asyncio.create_task(tweak_limit(workers, queue, results))

    # wait for workers to process the entire queue
    await queue.join()
    # finally, cancel the now-idle worker tasks
    for w in workers:
        w.cancel()

    # results are now available

The tweak_limit() function can now increase the limit simply by spawning new workers:

async def tweak_limit(workers, queue, results):
    while True:
        await asyncio.sleep(1)
        if need_more_workers:
            workers.append(asyncio.create_task(fetch_task(queue, results)))

Upvotes: 1

Related Questions