Reputation: 177
I am trying to limit the number of simultaneous async functions running using a semaphore, but I cannot get it to work. My code boils down to this:
import asyncio
async def send(i):
print(f"starting {i}")
await asyncio.sleep(4)
print(f"ending {i}")
async def helper():
async with asyncio.Semaphore(value=5):
await asyncio.gather(*[
send(1),
send(2),
send(3),
send(4),
send(5),
send(6),
send(7),
send(8),
send(9),
send(10),
])
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(helper())
loop.close()
The output is:
starting 1
starting 2
starting 3
starting 4
starting 5
starting 6
starting 7
starting 8
starting 9
starting 10
ending 1
ending 2
ending 3
ending 4
ending 5
ending 6
ending 7
ending 8
ending 9
ending 10
I hope and expect that only 5 will run at time, however all 10 start and stop at the same time. What am I doing wrong?
Upvotes: 10
Views: 10231
Reputation: 3836
Please find the working example below, feel free to ask questions:
import asyncio
async def send(i: int, semaphore: asyncio.Semaphore):
# to demonstrate that all tasks start nearly together
print(f"Hello: {i}")
# only two tasks can run code inside the block below simultaneously
async with semaphore:
print(f"starting {i}")
await asyncio.sleep(4)
print(f"ending {i}")
async def async_main():
s = asyncio.Semaphore(value=2)
await asyncio.gather(*[send(i, semaphore=s) for i in range(1, 11)])
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(async_main())
loop.close()
VERSION FROM 18.08.2023:
I see that many people are interested in how to use asyncio.Semaphore
and I decided to extend my answer.
The new version illustrates how to use procuder-consumers pattern with asyncio.Semaphore
. If you want something very simple, you are fine to use code from the original answer above. If you want more robust solution, which allows to limit number of asyncio.Tasks
to work with, you can use this more robust solution.
import asyncio
from typing import List
CONSUMERS_NUMBER = 10 # workers/consumer number
TASKS_NUMBER = 20 # number of tasks to do
async def producer(tasks_to_do: List[int], q: asyncio.Queue) -> None:
print(f"Producer started working!")
for task in tasks_to_do:
await q.put(task) # put tasks to Queue
# poison pill technique
for _ in range(CONSUMERS_NUMBER):
await q.put(None) # put poison pill to all worker/consumers
print("Producer finished working!")
async def consumer(
consumer_name: str,
q: asyncio.Queue,
semaphore: asyncio.Semaphore,
) -> None:
print(f"{consumer_name} started working!")
while True:
task = await q.get()
if task is None: # stop if poison pill was received
break
print(f"{consumer_name} took {task} from queue!")
# number of tasks which could be processed simultaneously
# is limited by semaphore
async with semaphore:
print(f"{consumer_name} started working with {task}!")
await asyncio.sleep(4)
print(f"{consumer_name} finished working with {task}!")
print(f"{consumer_name} finished working!")
async def async_main() -> None:
"""Main entrypoint of async app."""
tasks = [f"TheTask#{i + 1}" for i in range(TASKS_NUMBER)]
q = asyncio.Queue(maxsize=2)
s = asyncio.Semaphore(value=2)
consumers = [
consumer(
consumer_name=f"Consumer#{i + 1}",
q=q,
semaphore=s,
) for i in range(CONSUMERS_NUMBER)
]
await asyncio.gather(producer(tasks_to_do=tasks, q=q), *consumers)
if __name__ == "__main__":
asyncio.run(async_main())
Upvotes: 15
Reputation: 3760
I don't like the accepted answer (or at-least the current version of it). It doesn't "limit the number of simultaneous async functions running" that the question asks for. Consequently it doesn't scale - imagine a producer with millions of records instead of just 10 in the example used. Pretty sure you don't want a million entries in the event loop.
Instead we should limit the creation of the task itself, not limit the work done by the task once it's already created. Here's an example code that would work:
import asyncio
import sys
import typing
from collections.abc import Awaitable, Iterable
ConsumerId: typing.TypeAlias = int
tasks: dict[ConsumerId, Awaitable] = {}
sem = asyncio.Semaphore(4)
async def consumer(data, consumer_id: ConsumerId):
try:
print(f'Starting consumer {consumer_id}')
print(f'Working on "{data}" ...')
await asyncio.sleep(2)
print(f'Exiting consumer {consumer_id}')
finally:
sem.release()
# If we exited cleanly, just remove ourselves from `tasks` dict because nobody needs to await us.
if not sys.exc_info()[0]:
del tasks[consumer_id]
async def main(producer: Iterable):
for consumer_id, data in enumerate(producer):
await sem.acquire()
consumer_task = asyncio.create_task(consumer(data, consumer_id = consumer_id))
tasks[consumer_id] = consumer_task
# At this point just await all the remaining tasks which are either still running or had exited with exception
for coro in asyncio.as_completed(tasks.values()):
try:
await coro
except Exception as exc:
print(f'Do something about {exc}')
producer = range(100)
asyncio.run(main(producer = producer))
If you don't await a task that exited with an exception you get:
Task exception was never retrieved
future: <Task finished name='Task-2' coro=<consumer() done, defined at [...]
If you don't care about handling exceptions or want to handle the exception fully in the consumer itself (eg. just log it an move on) etc. then just del tasks[consumer_id]
in finally
unconditionally and that should keep the footprint low if a lot of consumers are prone to exit with an exception.
Upvotes: 3