fluffy
fluffy

Reputation: 5314

asyncio: Scheduling work items that schedule other work items

I am writing a Python program which schedules a number of asynchronous, I/O-bound items to occur, many of which will also be scheduling other, similar work items. The work items themselves are completely independent of one another and they do not require each others' results to be complete, nor do I need to gather any results from them for any sort of local output (beyond logging, which takes place as part of the work items themselves).

I was originally using a pattern like this:

async def some_task(foo):
    pending = []

    for x in foo:
        # ... do some work ...
        if some_condition:
            pending.append(some_task(bar))

    if pending:
        await asyncio.wait(pending)

However, I was running into trouble with some of the nested asyncio.wait(pending) calls sometimes hanging forever, even though the individual things being awaited were always completing (according to the debug output that was produced when I used KeyboardInterrupt to list out the state of the un-gathered results, which showed all of the futures as being in the done state). When I asked others for help they said I should be using asyncio.create_task instead, but I am not finding any useful information about how to do this nor have I been able to get clarification from the people who suggested this.

So, how can I satisfy this use case?

Upvotes: 0

Views: 114

Answers (1)

dm03514
dm03514

Reputation: 55952

Python asyncio.Queue may help to tie your program processing to program completion. It has a join() method which will block until all items in the queue have been received and processed.

Another benefit that I like is that the worker becomes more explicit as it pulls from a queue processes, potentially adds more items, and then ACKS, but this is just personal preference.

async def worker(q):
    while True:
      item = await queue.get()

      # process item potentially requeue more work
      if some_condition:
         await q.put('something new')

      queue.task_done()


async def run():
  queue = asyncio.Queue()
  worker = asyncio.ensure_future(worker(queue))
  await queue.join()
  worker.cancel()


loop = asyncio.get_event_loop()
loop.run_until_complete(run())
loop.close()

The example above was adapted from asyncio producer_consumer example and modified since your worker both consumes and produces:

https://asyncio.readthedocs.io/en/latest/producer_consumer.html


I'm not super sure how to fix your specific example but I would def look at the primitives that asyncio offers to help the event loop hook into your program state, notably join and using a Queue.

Upvotes: 3

Related Questions