Reputation: 357
I am using asyncio to gather tasks from a dictionary and execute them but I'm having difficulty getting it to work as intended. (This is sort of a follow-up question to my question here but I re-wrote the code a bit because it didn't work as intended and decided it would be better to use a wrapper function instead.)
So I'm using a wrapper to call the specified task function. I want the wrapper to forward any *args or **kwargs to the task function, and also repeat the task peridiocally if the interval
kwarg is set.
How do I pass this information to the wrapper and the task function, while keeping it easily maintainable with the ability to easily add new tasks to the tasks
dictionary?
Please take a look at my code for illustration.
import asyncio
import random
async def run_task(taskname, taskfunc, interval=None, *args, **kwargs):
# Wrapper which will run the specified function, and repeat it if 'interval' is set.
# Should also be able to pass any potential *args and **kwargs to the function.
fakedelay = random.randint(1,6)
print(f'{taskname} started (completing in {fakedelay} seconds)')
await taskfunc(fakedelay, *args, **kwargs)
print(f'{taskname} completed after {fakedelay} seconds')
if interval is not None:
print(f'Repeating {taskname} in {interval} seconds...')
while True:
await taskfunc(fakedelay, *args, **kwargs)
await asyncio.sleep(interval)
async def faketask(fakedelay, *args, **kwargs):
# Function to simulate a coroutine task
await asyncio.sleep(fakedelay)
async def main():
tasks = {
# Dictionary of tasks to perform
'Task-1': faketask,
'Task-2': faketask,
'Task-3': faketask,
}
tasklist = []
for taskname, taskfunc in tasks.items():
tasklist.append(run_task(taskname, taskfunc))
print(f'Added {taskname} to job queue.')
await asyncio.gather(*tasklist)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
This seems to work well so far. But let's say that I want Task-3 to repeat every 10 seconds after each time it completes. I would like to simply specify it in the tasks
dictionary, to make it as simple as possible to add new tasks in the future. E.g. like this:
tasks = {
# Dictionary of tasks to perform
'Task-1': faketask,
'Task-2': faketask,
'Task-3': faketask(interval=10),
}
But running this gives
TypeError: faketask() missing 1 required positional argument: 'fakedelay'
I suppose it makes sense because the interval
kwarg is meant for the wrapper and not the task function (faketask
) itself. And the wrapper doesn't seem able to add any *args or **kwargs (fakedelay
in this situation).
In my previous question I was given the suggestion to use functools.partial
.
tasks = {
'Task-1': faketask,
'Task-2': faketask,
'Task-3': functools.partial(faketask, interval=10),
}
It solved the issue from my previous question somewhat but after re-writing the code and adding a wrapper function, it seemingly does nothing now, and admittedly I'm having difficulty understanding how functools.partial
is meant to be used.
So my questions are,
How can I go about this, is this the appropriate way to accomplish what I'm trying to do?
How can I provide *args and **kwargs to a specific function in the tasks
dictionary, in as simple of a way as possible (so new tasks can be easily added), and have them forwarded to the task function itself via the wrapper?
Is my method of repeating a function periodically correct? I specifically want it to only sleep after completion before starting again, and not just fire off again even if the last instance hasn't finished yet.
Upvotes: 0
Views: 1243
Reputation: 155046
Using functools.partial
only makes sense if you are actually wrapping faketask
to include an optional keyword argument. If you need to apply the keyword argument to a different function (run_task
), then you need to do so independently. For example, you could specify additional optoins for run_task
in the tasks
dict:
tasks = {
'Task-1': faketask,
'Task-2': faketask,
'Task-3': (faketask, {'interval': 10)),
}
The code that invokes run_task
will then need to recognize the tuples:
for taskname, taskfunc_maybe_with_options in tasks.items():
if isinstance(taskfunc_maybe_with_options, tuple):
taskfunc, options = taskfunc_maybe_with_options
else:
taskfunc = taskfunc_maybe_with_options
options = {}
tasklist.append(run_task(taskname, taskfunc, **options))
Upvotes: 2