Jeff Erickson
Jeff Erickson

Reputation: 177

Using a semaphore with asyncio in Python

I am trying to limit the number of simultaneous async functions running using a semaphore, but I cannot get it to work. My code boils down to this:

import asyncio


async def send(i):

    print(f"starting {i}")
    await asyncio.sleep(4)
    print(f"ending {i}")


async def helper():
    async with asyncio.Semaphore(value=5):
        await asyncio.gather(*[
            send(1),
            send(2),
            send(3),
            send(4),
            send(5),
            send(6),
            send(7),
            send(8),
            send(9),
            send(10),
        ])


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(helper())
    loop.close()

The output is:

starting 1
starting 2
starting 3
starting 4
starting 5
starting 6
starting 7
starting 8
starting 9
starting 10
ending 1
ending 2
ending 3
ending 4
ending 5
ending 6
ending 7
ending 8
ending 9
ending 10

I hope and expect that only 5 will run at time, however all 10 start and stop at the same time. What am I doing wrong?

Upvotes: 10

Views: 10231

Answers (2)

Artiom  Kozyrev
Artiom Kozyrev

Reputation: 3836

Please find the working example below, feel free to ask questions:

import asyncio


async def send(i: int, semaphore: asyncio.Semaphore):
    # to demonstrate that all tasks start nearly together
    print(f"Hello: {i}")
    # only two tasks can run code inside the block below simultaneously
    async with semaphore:
        print(f"starting {i}")
        await asyncio.sleep(4)
        print(f"ending {i}")


async def async_main():
    s = asyncio.Semaphore(value=2)
    await asyncio.gather(*[send(i, semaphore=s) for i in range(1, 11)])


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(async_main())
    loop.close()

VERSION FROM 18.08.2023:

I see that many people are interested in how to use asyncio.Semaphore and I decided to extend my answer.

The new version illustrates how to use procuder-consumers pattern with asyncio.Semaphore. If you want something very simple, you are fine to use code from the original answer above. If you want more robust solution, which allows to limit number of asyncio.Tasks to work with, you can use this more robust solution.

import asyncio
from typing import List

CONSUMERS_NUMBER = 10  # workers/consumer number
TASKS_NUMBER = 20  # number of tasks to do


async def producer(tasks_to_do: List[int], q: asyncio.Queue) -> None:
    print(f"Producer started working!")
    for task in tasks_to_do:
        await q.put(task)  # put tasks to Queue

    # poison pill technique
    for _ in range(CONSUMERS_NUMBER):
        await q.put(None)  # put poison pill to all worker/consumers

    print("Producer finished working!")


async def consumer(
        consumer_name: str,
        q: asyncio.Queue,
        semaphore: asyncio.Semaphore,
) -> None:
    print(f"{consumer_name} started working!")
    while True:
        task = await q.get()

        if task is None:  # stop if poison pill was received
            break

        print(f"{consumer_name} took {task} from queue!")

        # number of tasks which could be processed simultaneously
        # is limited by semaphore
        async with semaphore:
            print(f"{consumer_name} started working with {task}!")
            await asyncio.sleep(4)
            print(f"{consumer_name} finished working with {task}!")

    print(f"{consumer_name} finished working!")


async def async_main() -> None:
    """Main entrypoint of async app."""
    tasks = [f"TheTask#{i + 1}" for i in range(TASKS_NUMBER)]
    q = asyncio.Queue(maxsize=2)
    s = asyncio.Semaphore(value=2)
    consumers = [
        consumer(
            consumer_name=f"Consumer#{i + 1}",
            q=q,
            semaphore=s,
        ) for i in range(CONSUMERS_NUMBER)
    ]
    await asyncio.gather(producer(tasks_to_do=tasks, q=q), *consumers)


if __name__ == "__main__":
    asyncio.run(async_main())


Upvotes: 15

ustulation
ustulation

Reputation: 3760

I don't like the accepted answer (or at-least the current version of it). It doesn't "limit the number of simultaneous async functions running" that the question asks for. Consequently it doesn't scale - imagine a producer with millions of records instead of just 10 in the example used. Pretty sure you don't want a million entries in the event loop.

Instead we should limit the creation of the task itself, not limit the work done by the task once it's already created. Here's an example code that would work:

import asyncio
import sys
import typing
from collections.abc import Awaitable, Iterable


ConsumerId: typing.TypeAlias = int

tasks: dict[ConsumerId, Awaitable] = {}
sem = asyncio.Semaphore(4)

async def consumer(data, consumer_id: ConsumerId):
    try:
        print(f'Starting consumer {consumer_id}')
        print(f'Working on "{data}" ...')
        await asyncio.sleep(2)
        print(f'Exiting consumer {consumer_id}')
    finally:
        sem.release()
        # If we exited cleanly, just remove ourselves from `tasks` dict because nobody needs to await us.
        if not sys.exc_info()[0]:
            del tasks[consumer_id]

async def main(producer: Iterable):
    for consumer_id, data in enumerate(producer):
        await sem.acquire()
        consumer_task = asyncio.create_task(consumer(data, consumer_id = consumer_id))
        tasks[consumer_id] = consumer_task

    # At this point just await all the remaining tasks which are either still running or had exited with exception
    for coro in asyncio.as_completed(tasks.values()):
        try:
            await coro
        except Exception as exc:
            print(f'Do something about {exc}')

producer = range(100)
asyncio.run(main(producer = producer))

If you don't await a task that exited with an exception you get:

Task exception was never retrieved
future: <Task finished name='Task-2' coro=<consumer() done, defined at [...]

If you don't care about handling exceptions or want to handle the exception fully in the consumer itself (eg. just log it an move on) etc. then just del tasks[consumer_id] in finally unconditionally and that should keep the footprint low if a lot of consumers are prone to exit with an exception.

Upvotes: 3

Related Questions