noob
noob

Reputation: 357

Passing *args and **kwargs to an asyncio periodic wrapper function from a dictionary

I am using asyncio to gather tasks from a dictionary and execute them but I'm having difficulty getting it to work as intended. (This is sort of a follow-up question to my question here but I re-wrote the code a bit because it didn't work as intended and decided it would be better to use a wrapper function instead.)

So I'm using a wrapper to call the specified task function. I want the wrapper to forward any *args or **kwargs to the task function, and also repeat the task peridiocally if the interval kwarg is set.

How do I pass this information to the wrapper and the task function, while keeping it easily maintainable with the ability to easily add new tasks to the tasks dictionary?

Please take a look at my code for illustration.

import asyncio
import random

async def run_task(taskname, taskfunc, interval=None, *args, **kwargs):
    # Wrapper which will run the specified function, and repeat it if 'interval' is set.
    # Should also be able to pass any potential *args and **kwargs to the function.
    fakedelay = random.randint(1,6)
    print(f'{taskname} started (completing in {fakedelay} seconds)')
    await taskfunc(fakedelay, *args, **kwargs)
    print(f'{taskname} completed after {fakedelay} seconds')
    if interval is not None:
        print(f'Repeating {taskname} in {interval} seconds...')
        while True:
            await taskfunc(fakedelay, *args, **kwargs)
            await asyncio.sleep(interval)

async def faketask(fakedelay, *args, **kwargs):
    # Function to simulate a coroutine task
    await asyncio.sleep(fakedelay)

async def main():
    tasks = {
        # Dictionary of tasks to perform
        'Task-1': faketask,
        'Task-2': faketask,
        'Task-3': faketask,
    }

    tasklist = []
    for taskname, taskfunc in tasks.items():
        tasklist.append(run_task(taskname, taskfunc))
        print(f'Added {taskname} to job queue.')
    await asyncio.gather(*tasklist)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()

This seems to work well so far. But let's say that I want Task-3 to repeat every 10 seconds after each time it completes. I would like to simply specify it in the tasks dictionary, to make it as simple as possible to add new tasks in the future. E.g. like this:

tasks = {
    # Dictionary of tasks to perform
    'Task-1': faketask,
    'Task-2': faketask,
    'Task-3': faketask(interval=10),
}

But running this gives TypeError: faketask() missing 1 required positional argument: 'fakedelay' I suppose it makes sense because the interval kwarg is meant for the wrapper and not the task function (faketask) itself. And the wrapper doesn't seem able to add any *args or **kwargs (fakedelay in this situation).

In my previous question I was given the suggestion to use functools.partial.

tasks = {
    'Task-1': faketask,
    'Task-2': faketask,
    'Task-3': functools.partial(faketask, interval=10),
}

It solved the issue from my previous question somewhat but after re-writing the code and adding a wrapper function, it seemingly does nothing now, and admittedly I'm having difficulty understanding how functools.partial is meant to be used.

So my questions are,

  1. How can I go about this, is this the appropriate way to accomplish what I'm trying to do?

  2. How can I provide *args and **kwargs to a specific function in the tasks dictionary, in as simple of a way as possible (so new tasks can be easily added), and have them forwarded to the task function itself via the wrapper?

  3. Is my method of repeating a function periodically correct? I specifically want it to only sleep after completion before starting again, and not just fire off again even if the last instance hasn't finished yet.

Upvotes: 0

Views: 1243

Answers (1)

user4815162342
user4815162342

Reputation: 155046

Using functools.partial only makes sense if you are actually wrapping faketask to include an optional keyword argument. If you need to apply the keyword argument to a different function (run_task), then you need to do so independently. For example, you could specify additional optoins for run_task in the tasks dict:

tasks = {
    'Task-1': faketask,
    'Task-2': faketask,
    'Task-3': (faketask, {'interval': 10)),
}

The code that invokes run_task will then need to recognize the tuples:

for taskname, taskfunc_maybe_with_options in tasks.items():
    if isinstance(taskfunc_maybe_with_options, tuple):
        taskfunc, options = taskfunc_maybe_with_options
    else:
        taskfunc = taskfunc_maybe_with_options
        options = {}
    tasklist.append(run_task(taskname, taskfunc, **options))

Upvotes: 2

Related Questions