Reputation: 322
I am currently building a project that requires multiple requests made to various endpoints. I am wrapping these requests in Aiohttp to allow for async.
The problem:
I have three Queues: queue1
, queue2
, and queue3
. Additionally, I have three worker functions (worker1
, worker2
, worker3
) that are associated with their respective Queue. The first queue is populated immediately with a list IDs that is known prior to running. When the request is finished and the data is committed to a database, it passes the ID to queue2
. A worker2
will take this ID and request more data. From this data it will begin to generate a list of IDs (different from the IDs in queue1/queue2
. worker2
will put the IDs in queue3
. Finally worker3
will grab this ID from queue3
and request more data before committing to a database.
The issue arises with the fact queue.join()
is a blocking call. Each worker is tied to a separate Queue so the join for queue1
will block until its finished. This is fine, but it also defeats the purpose of using async. Without using join()
the program is unable to detect when the Queues are totally empty. The other issue is that there may be silent errors when one of the Queues is empty but there is still data that hasn't been added yet.
The basic code outline is as follows:
queue1 = asyncio.Queue()
queue2 = asyncio.Queue()
queue3 = asyncio.Queue()
async with aiohttp.ClientSession() as session:
for i in range(3):
tasks.append(asyncio.create_task(worker1(queue1)))
for i in range(3):
tasks.append(asyncio.create_task(worker2(queue2)))
for i in range(10):
tasks.append(asyncio.create_task(worker3(queue3)))
for i in IDs:
queue1.put_nowait(i)
await asyncio.gather(*tasks)
The worker functions sit in an infinite loop waiting for items to enter the queue.
When the data has all been processed there will be no exit and the program will hang.
Is there a way to effectively manage the workers and end properly?
Upvotes: 3
Views: 3612
Reputation: 155495
As nicely explained in this answer, Queue.join
serves to inform the producer when all the work injected into the queue got completed. Since your first queue doesn't know when a particular item is done (it's multiplied and distributed to other queues), join
is not the right tool for you.
Judging from your code, it seems that your workers need to run for only as long as it takes to process the queue's initial items. If that is the case, then you can use a shutdown sentinel to signal the workers to exit. For example:
async with aiohttp.ClientSession() as session:
# ... create tasks as above ...
for i in IDs:
queue1.put_nowait(i)
queue1.put_nowait(None) # no more work
await asyncio.gather(*tasks)
This is like your original code, but with an explicit shutdown request. Workers must detect the sentinel and react accordingly: propagate it to the next queue/worker and exit. For example, in worker1
:
while True:
item = queue1.get()
if item is None:
# done with processing, propagate sentinel to worker2 and exit
await queue2.put(None)
break
# ... process item as usual ...
Doing the same in other two workers (except for worker3
which won't propagate because there's no next queue) will result in all three tasks completing once the work is done. Since queues are FIFO, the workers can safely exit after encountering the sentinel, knowing that no items have been dropped. The explicit shutdown also distinguishes a shut-down queue from one that happens to be empty, thus preventing workers from exiting prematurely due to a temporarily empty queue.
Up to Python 3.7, this technique was actually demonstrated in the documentation of Queue
, but that example somewhat confusingly shows both the use of Queue.join
and the use of a shutdown sentinel. The two are separate and can be used independently of one another. (And it might also make sense to use them together, e.g. to use Queue.join
to wait for a "milestone", and then put other stuff in the queue, while reserving the sentinel for stopping the workers.)
Upvotes: 5