Reputation: 419
I want to have an async Scheduler for "actions" execution, which satisfies cerain properties:
asyncio.sleep()
to let other coroutines to have their turns.asyncio.sleep()
, until new action is added.My attempt:
import asyncio
import time
class Action:
def __init__(self, timestamp):
self.timestamp = timestamp
async def do(self):
print("Doing action...")
class Scheduler:
def __init__(self):
self._actions = []
self._sleep_future = None
def add(self, action):
self._actions.append(action)
self._actions.sort(key=lambda x: x.timestamp)
if self._sleep_future:
self._sleep_future.cancel()
def pop(self):
return self._actions.pop(0)
async def start(self):
asyncio.create_task(self.loop())
async def loop(self):
while True:
now = time.time()
while self._actions:
action = self._actions[0]
if action.timestamp <= now:
action = self.pop()
await action.do()
else:
break
self._sleep_future = asyncio.ensure_future(
asyncio.sleep(self._actions[0].timestamp - now)
)
try:
await self._sleep_future
except asyncio.CancelledError:
continue
finally:
self._sleep_future = None
This implementation is not reliable and does not account the condition (5) that I seek!
Could you recommend me something?
Upvotes: 1
Views: 690
Reputation: 154886
The asyncio event loop already contains the code you were trying to implement - ordering timeouts and waiting for tasks to be submitted. You need to adapt the interface of the Scheduler
to the underlying asyncio functionality, for example like this:
class Scheduler:
def __init__(self):
self._running = asyncio.Lock()
async def start(self):
pass # asyncio event loop will do the actual work
def add(self, action):
loop = asyncio.get_event_loop()
# Can't use `call_at()` because event loop time uses a
# different timer than time.time().
loop.call_later(
action.timestamp - time.time(),
loop.create_task, self._execute(action)
)
async def _execute(self, action):
# Use a lock to ensure that no two actions run at
# the same time.
async with self._running:
await action.do()
Upvotes: 1