user2968505
user2968505

Reputation: 445

Asynchronous python function calls, keep rescheduling functions in asyncio.gather without waiting for longest running task

So currently i have asynchronous python code (using asyncio) that looks like this:

while datetime.now() < time_limit:
  last_start_run_time = datetime.now()
  result = await asyncio.gather(
    *(
      get_output(output_source)
    )
    for output_source in output_sources
  )
  for output in res:
    output_dict.update(my_output_dict)
  if (datetime.now() - last_start_run_time).seconds < upper_bound_wait:
    await asyncio.sleep(delay)

The problem with this code is that it always waits for the longest running get_output call to call the function again for all output sources.

I am wondering how can i rewrite this code in a way that calls each get_output call for each output_source as soon as it had finished it's earlier run (if it is within the upper_bound_wait ), i would also like the delay to be per get_output function call rather than after it finishes all of them.

How can this be achieved used asyncio?

Upvotes: 0

Views: 459

Answers (2)

Paul Cornelius
Paul Cornelius

Reputation: 10946

Suggestion: move all your logic to a coroutine and create the tasks in a simple loop. each task will decide for itself when to delay, when to repeat, and when to exit.

async def get_output_repeatable(upper_bound_wait, output_source):
    while datetime.now() < time_limit:
        last_start_run_time = datetime.now()
        output_dict.update(await get_output(output_source))
        if (datetime.now() - last_start_run_time).seconds < upper_bound_wait:
            await asyncio.sleep(delay)

def run_them_all():            
  for output_source in output_sources:
      asyncio.create_task(get_output_repeatedly(upper_bound_wait, output_source))

Upvotes: 2

user4815162342
user4815162342

Reputation: 155046

You can use asyncio.wait with FIRST_COMPLETED in a loop. You'll need to store the original source somewhere, e.g. on the future/task objects which are both passed to and returned from asyncio.wait. That way the loop can reinvoke get_output with the same source to reschedule it. Likewise, to make the delay per-get_output-invocation, you'll need to store the start time of each previous invocation, probably also on the future/task. For example (untested):

async def delayed_run(aw):
    await asyncio.sleep(delay)
    return await aw

async def run_sources(output_sources, time_limit, upper_bound_wait):
    output_dict = {}

    pending = set()
    for output_source in output_sources:
        fut = asyncio.create_task(get_output(output_source))
        fut.my_start_time = time.time()
        fut.my_source = output_source
        pending.add(fut)

    while pending and time.time() < time_limit:
        done, pending = await asyncio.wait(
            pending, return_when=asyncio.FIRST_COMPLETED,
            timeout=time_limit - time.time())

        for done_fut in done:
            output_dict.update(done_fut.result())
            new_coro = get_output(done_fut.my_source)
            if time.time() - done_fut.my_start_time < upper_bound_wait:
                new_coro = delayed_run(new_coro)
            new_fut = asyncio.create_task(new_coro)
            new_fut.my_start_time = time.time()
            new_fut.my_source = done_fut.my_source
            pending.add(new_fut)

    return output_dict

Upvotes: 1

Related Questions