Lionead
Lionead

Reputation: 31

how to implement asyncio worker queue in python?

i need to make worker queue for aiohttp.

right now im using asyncio.gather, but it works in wrong way:

enter image description here

this is what i want to make:

enter image description here

the first one can be implemented with following code:

async def some_stuff(_):
    pass

tasks = []
for i in data:
    tasks.append(do_stuff(i))

asyncio.run(asyncio.gather(*tasks))

i need example of

Upvotes: 2

Views: 5683

Answers (1)

Paul Cornelius
Paul Cornelius

Reputation: 11009

As I understand it, you want to run exactly 5 tasks in parallel. When one of those tasks finishes, you want to start a new task immediately. For this purpose, asyncio.gather doesn't work since it waits for all of its tasks to finish before proceeding.

I suggest something along these lines:

from collections import deque
import random
import asyncio

class RunSome:
    def __init__(self, task_count=5):
        self.task_count = task_count
        self.running = set()
        self.waiting = deque()
        
    @property
    def running_task_count(self):
        return len(self.running)
        
    def add_task(self, coro):
        if len(self.running) >= self.task_count:
            self.waiting.append(coro)
        else:
            self._start_task(coro)
        
    def _start_task(self, coro):
        self.running.add(coro)
        asyncio.create_task(self._task(coro))
        
    async def _task(self, coro):
        try:
            return await coro
        finally:
            self.running.remove(coro)
            if self.waiting:
                coro2 = self.waiting.popleft()
                self._start_task(coro2)
            
async def main():
    runner = RunSome()
    async def rand_delay():
        rnd = random.random() + 0.5
        print("Task started", asyncio.current_task().get_name(),
              runner.running_task_count)
        await asyncio.sleep(rnd)
        print("Task ended", asyncio.current_task().get_name(),
              runner.running_task_count)
    for _ in range(50):
        runner.add_task(rand_delay())
    # keep the program alive until all the tasks are done
    while runner.running_task_count > 0:
        await asyncio.sleep(0.1)
        
if __name__ == "__main__":
    asyncio.run(main())
        

Output:

Task started Task-2 5
Task started Task-3 5
Task started Task-4 5
Task started Task-5 5
Task started Task-6 5
Task ended Task-6 5
Task started Task-7 5
Task ended Task-4 5
Task ended Task-2 5
Task started Task-8 5
Task started Task-9 5
Task ended Task-5 5
Task started Task-10 5
Task ended Task-3 5
.....
Task started Task-51 5
Task ended Task-48 5
Task ended Task-47 4
Task ended Task-49 3
Task ended Task-51 2
Task ended Task-50 1

Coroutines are first class objects in Python. As such they can be put into lists and sets.

All of the task creation is handled by RunSome. You pass it coroutines to be executed. It knows how many tasks are currently running, and it decides either to create a new task immediately or add the coroutine to a queue of pending tasks. When a task finishes, it grabs a new coroutine out of the queue, if one is available. The number of running tasks never exceeds the threshold count that was passed to the constructor (default is 5). The tasks are wrappers around the passed coroutines.

You will have to figure out what to do with the returned values, if any. The error handling here is rudimentary, but it does maintain the correct number of running tasks because of the try:finally: block.

Upvotes: 6

Related Questions