Reputation: 3551
I'm migrating from tornado
to asyncio
, and I can't find the asyncio
equivalent of tornado
's PeriodicCallback
. (A PeriodicCallback
takes two arguments: the function to run and the number of milliseconds between calls.)
asyncio
?RecursionError
after a while?Upvotes: 119
Views: 139497
Reputation: 130
import asyncio
from typing import Callable, Any, Optional
import logging
class AsyncioScheduler:
def __init__(self):
self.tasks = {}
self.logger = logging.getLogger(__name__)
async def schedule(
self,
func: Callable,
interval: float,
*args: Any,
times: Optional[int] = None,
**kwargs: Any
) -> str:
"""
Schedule a function to run periodically.
:param func: The function to schedule
:param interval: The interval between runs in seconds
:param args: Positional arguments to pass to the function
:param times: Number of times to run the function (None for indefinitely)
:param kwargs: Keyword arguments to pass to the function
:return: A unique identifier for the scheduled task
"""
task_id = f"task_{id(func)}_{len(self.tasks)}"
async def wrapped_func():
run_count = 0
while times is None or run_count < times:
try:
if asyncio.iscoroutinefunction(func):
await func(*args, **kwargs)
else:
await asyncio.to_thread(func, *args, **kwargs)
except Exception as e:
self.logger.error(f"Error in scheduled task {task_id}: {e}")
run_count += 1
if times is not None and run_count >= times:
break
await asyncio.sleep(interval)
self.logger.info(f"Task {task_id} completed after {run_count} runs")
del self.tasks[task_id]
task = asyncio.create_task(wrapped_func())
self.tasks[task_id] = task
self.logger.info(f"Scheduled task {task_id} to run every {interval} seconds")
return task_id
async def cancel(self, task_id: str) -> bool:
"""
Cancel a scheduled task.
:param task_id: The unique identifier of the task to cancel
:return: True if the task was cancelled, False if it wasn't found
"""
if task_id in self.tasks:
self.tasks[task_id].cancel()
del self.tasks[task_id]
self.logger.info(f"Cancelled task {task_id}")
return True
return False
async def cancel_all(self):
"""
Cancel all scheduled tasks.
"""
for task_id in list(self.tasks.keys()):
await self.cancel(task_id)
self.logger.info("Cancelled all tasks")
# Example usage
async def example_task(name: str):
print(f"Running task: {name}")
async def main():
# Set up logging
logging.basicConfig(level=logging.INFO)
scheduler = AsyncioScheduler()
# Schedule tasks
await scheduler.schedule(example_task, 2, "Task A") # Run indefinitely every 2 seconds
await scheduler.schedule(example_task, 3, "Task B", times=5) # Run 5 times every 3 seconds
# Run for a while
await asyncio.sleep(20)
# Cancel all tasks
await scheduler.cancel_all()
if __name__ == "__main__":
asyncio.run(main())
Upvotes: -1
Reputation: 8006
For multiple types of scheduling I'd recommend APSScheduler which has asyncio support.
I use it for a simple python process I can fire up using docker and just runs like a cron executing something weekly, until I kill the docker/process.
Upvotes: 2
Reputation: 21
This solution uses the decoration concept from Fernando José Esteves de Souza, the drifting workaround from Wojciech Migda and a superclass in order to generate most elegant code as possible to deal with asynchronous periodic functions.
The solution is comprised of the following files:
periodic_async_thread.py
with the base class for you to subclassa_periodic_thread.py
with an example subclassrun_me.py
with an example instantiation and runThe PeriodicAsyncThread
class in the file periodic_async_thread.py
:
import time
import asyncio
import abc
class PeriodicAsyncThread:
def __init__(self, period):
self.period = period
def periodic(self):
def scheduler(fcn):
async def wrapper(*args, **kwargs):
def g_tick():
t = time.time()
count = 0
while True:
count += 1
yield max(t + count * self.period - time.time(), 0)
g = g_tick()
while True:
# print('periodic', time.time())
asyncio.create_task(fcn(*args, **kwargs))
await asyncio.sleep(next(g))
return wrapper
return scheduler
@abc.abstractmethod
async def run(self, *args, **kwargs):
return
def start(self):
asyncio.run(self.run())
An example of a simple subclass APeriodicThread
in the file a_periodic_thread.py
:
from periodic_async_thread import PeriodicAsyncThread
import time
import asyncio
class APeriodicThread(PeriodicAsyncThread):
def __init__(self, period):
super().__init__(period)
self.run = self.periodic()(self.run)
async def run(self, *args, **kwargs):
await asyncio.sleep(2)
print(time.time())
Instantiating and running the example class in the file run_me.py
:
from a_periodic_thread import APeriodicThread
apt = APeriodicThread(2)
apt.start()
This code represents an elegant solution that also mitigates the time drift problem of other solutions. The output is similar to:
1642711285.3898764
1642711287.390698
1642711289.3924973
1642711291.3920736
The solution is comprised of the following files:
async_thread.py
with the canopy asynchronous thread class.periodic_async_thread.py
with the base class for you to subclassa_periodic_thread.py
with an example subclassrun_me.py
with an example instantiation and runThe AsyncThread
class in the file async_thread.py
:
from threading import Thread
import asyncio
import abc
class AsyncThread(Thread):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
@abc.abstractmethod
async def async_run(self, *args, **kwargs):
pass
def run(self, *args, **kwargs):
# loop = asyncio.new_event_loop()
# asyncio.set_event_loop(loop)
# loop.run_until_complete(self.async_run(*args, **kwargs))
# loop.close()
asyncio.run(self.async_run(*args, **kwargs))
The PeriodicAsyncThread
class in the file periodic_async_thread.py
:
import time
import asyncio
from .async_thread import AsyncThread
class PeriodicAsyncThread(AsyncThread):
def __init__(self, period, *args, **kwargs):
self.period = period
super().__init__(*args, **kwargs)
self.async_run = self.periodic()(self.async_run)
def periodic(self):
def scheduler(fcn):
async def wrapper(*args, **kwargs):
def g_tick():
t = time.time()
count = 0
while True:
count += 1
yield max(t + count * self.period - time.time(), 0)
g = g_tick()
while True:
# print('periodic', time.time())
asyncio.create_task(fcn(*args, **kwargs))
await asyncio.sleep(next(g))
return wrapper
return scheduler
An example of a simple subclass APeriodicThread
in the file a_periodic_thread.py
:
import time
from threading import current_thread
from .periodic_async_thread import PeriodicAsyncThread
import asyncio
class APeriodicAsyncTHread(PeriodicAsyncThread):
async def async_run(self, *args, **kwargs):
print(f"{current_thread().name} {time.time()} Hi!")
await asyncio.sleep(1)
print(f"{current_thread().name} {time.time()} Bye!")
Instantiating and running the example class in the file run_me.py
:
from .a_periodic_thread import APeriodicAsyncTHread
a = APeriodicAsyncTHread(2, name = "a periodic async thread")
a.start()
a.join()
This code represents an elegant solution that also mitigates the time drift problem of other solutions. The output is similar to:
a periodic async thread 1643726990.505269 Hi!
a periodic async thread 1643726991.5069854 Bye!
a periodic async thread 1643726992.506919 Hi!
a periodic async thread 1643726993.5089169 Bye!
a periodic async thread 1643726994.5076022 Hi!
a periodic async thread 1643726995.509422 Bye!
a periodic async thread 1643726996.5075526 Hi!
a periodic async thread 1643726997.5093904 Bye!
a periodic async thread 1643726998.5072556 Hi!
a periodic async thread 1643726999.5091035 Bye!
Upvotes: 2
Reputation: 1121148
There is no built-in support for periodic calls, no.
Just create your own scheduler loop that sleeps and executes any tasks scheduled:
import math, time
async def scheduler():
while True:
# sleep until the next whole second
now = time.time()
await asyncio.sleep(math.ceil(now) - now)
# execute any scheduled tasks
async for task in scheduled_tasks(time.time()):
await task()
The scheduled_tasks()
iterator should produce tasks that are ready to be run at the given time. Note that producing the schedule and kicking off all the tasks could in theory take longer than 1 second; the idea here is that the scheduler yields all tasks that should have started since the last check.
Upvotes: 30
Reputation: 633
This is what I did to test my theory of periodic call backs using asyncio. I don't have experience using Tornado, so I'm not sure exactly how the periodic call backs work with it. I am used to using the after(ms, callback)
method in Tkinter though, and this is what I came up with. While True:
Just looks ugly to me even if it is asynchronous (more so than globals). The call_later(s, callback, *args)
method uses seconds not milliseconds though.
import asyncio
my_var = 0
def update_forever(the_loop):
global my_var
print(my_var)
my_var += 1
# exit logic could be placed here
the_loop.call_later(3, update_forever, the_loop) # the method adds a delayed callback on completion
event_loop = asyncio.get_event_loop()
event_loop.call_soon(update_forever, event_loop)
event_loop.run_forever()
Upvotes: 0
Reputation: 766
Based on @A. Jesse Jiryu Davis answer (with @Torkel Bjørnson-Langen and @ReWrite comments) this is an improvement which avoids drift.
import time
import asyncio
@asyncio.coroutine
def periodic(period):
def g_tick():
t = time.time()
count = 0
while True:
count += 1
yield max(t + count * period - time.time(), 0)
g = g_tick()
while True:
print('periodic', time.time())
yield from asyncio.sleep(next(g))
loop = asyncio.get_event_loop()
task = loop.create_task(periodic(1))
loop.call_later(5, task.cancel)
try:
loop.run_until_complete(task)
except asyncio.CancelledError:
pass
Upvotes: 7
Reputation: 614
A variant that may be helpful: if you want your recurring call to happen every n seconds instead of n seconds between the end of the last execution and the beginning of the next, and you don't want calls to overlap in time, the following is simpler:
async def repeat(interval, func, *args, **kwargs):
"""Run func every interval seconds.
If func has not finished before *interval*, will run again
immediately when the previous iteration finished.
*args and **kwargs are passed as the arguments to func.
"""
while True:
await asyncio.gather(
func(*args, **kwargs),
asyncio.sleep(interval),
)
And an example of using it to run a couple tasks in the background:
async def f():
await asyncio.sleep(1)
print('Hello')
async def g():
await asyncio.sleep(0.5)
print('Goodbye')
async def main():
t1 = asyncio.ensure_future(repeat(3, f))
t2 = asyncio.ensure_future(repeat(2, g))
await t1
await t2
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Upvotes: 43
Reputation: 141
Alternative version with decorator for python 3.7
import asyncio
import time
def periodic(period):
def scheduler(fcn):
async def wrapper(*args, **kwargs):
while True:
asyncio.create_task(fcn(*args, **kwargs))
await asyncio.sleep(period)
return wrapper
return scheduler
@periodic(2)
async def do_something(*args, **kwargs):
await asyncio.sleep(5) # Do some heavy calculation
print(time.time())
if __name__ == '__main__':
asyncio.run(do_something('Maluzinha do papai!', secret=42))
Upvotes: 14
Reputation: 24009
For Python versions below 3.5:
import asyncio
@asyncio.coroutine
def periodic():
while True:
print('periodic')
yield from asyncio.sleep(1)
def stop():
task.cancel()
loop = asyncio.get_event_loop()
loop.call_later(5, stop)
task = loop.create_task(periodic())
try:
loop.run_until_complete(task)
except asyncio.CancelledError:
pass
For Python 3.5 and above:
import asyncio
async def periodic():
while True:
print('periodic')
await asyncio.sleep(1)
def stop():
task.cancel()
loop = asyncio.get_event_loop()
loop.call_later(5, stop)
task = loop.create_task(periodic())
try:
loop.run_until_complete(task)
except asyncio.CancelledError:
pass
Upvotes: 107
Reputation: 39516
When you feel that something should happen "in background" of your asyncio program, asyncio.Task
might be good way to do it. You can read this post to see how to work with tasks.
Here's possible implementation of class that executes some function periodically:
import asyncio
from contextlib import suppress
class Periodic:
def __init__(self, func, time):
self.func = func
self.time = time
self.is_started = False
self._task = None
async def start(self):
if not self.is_started:
self.is_started = True
# Start task to call func periodically:
self._task = asyncio.ensure_future(self._run())
async def stop(self):
if self.is_started:
self.is_started = False
# Stop task and await it stopped:
self._task.cancel()
with suppress(asyncio.CancelledError):
await self._task
async def _run(self):
while True:
await asyncio.sleep(self.time)
self.func()
Let's test it:
async def main():
p = Periodic(lambda: print('test'), 1)
try:
print('Start')
await p.start()
await asyncio.sleep(3.1)
print('Stop')
await p.stop()
await asyncio.sleep(3.1)
print('Start')
await p.start()
await asyncio.sleep(3.1)
finally:
await p.stop() # we should stop task finally
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Output:
Start
test
test
test
Stop
Start
test
test
test
[Finished in 9.5s]
As you see on start
we just start task that calls some functions and sleeps some time in endless loop. On stop
we just cancel that task. Note, that task should be stopped at the moment program finished.
One more important thing that your callback shouldn't take much time to be executed (or it'll freeze your event loop). If you're planning to call some long-running func
, you possibly would need to run it in executor.
Upvotes: 40