Sergey Dylda
Sergey Dylda

Reputation: 419

Python: How to implement async scheduler with sequential execution?

I want to have an async Scheduler for "actions" execution, which satisfies cerain properties:

  1. Actions are one-time and are scheduled at exact timestamps.
  2. Actions should be executed in strictly sequential order, i.e. the scheduler cannot launch next action until the previous one finished execution.
  3. Between the exectuion of actions, when scheduler waits for the next timestamp, the scheduler must be in a state of asyncio.sleep() to let other coroutines to have their turns.
  4. When new action is scheduled, the scheduler should immediatly readjust it's waiting time, so that the scheduler always awaits for the soonest possible action.
  5. When no actions are scheduled, scheduler should be in the permanent state of asyncio.sleep(), until new action is added.

My attempt:


import asyncio
import time

class Action:

    def __init__(self, timestamp):
        self.timestamp = timestamp

    async def do(self):
        print("Doing action...")

class Scheduler:

    def __init__(self):
        self._actions = []
        self._sleep_future = None

    def add(self, action):
        self._actions.append(action)
        self._actions.sort(key=lambda x: x.timestamp)

        if self._sleep_future:
            self._sleep_future.cancel()

    def pop(self):
        return self._actions.pop(0)

    async def start(self):
        asyncio.create_task(self.loop())

    async def loop(self):
        while True:
            now = time.time()            
            while self._actions:
                action = self._actions[0]
                if action.timestamp <= now:
                    action = self.pop()                
                    await action.do()                   
                else:
                    break

            self._sleep_future = asyncio.ensure_future(
                asyncio.sleep(self._actions[0].timestamp - now)
            )

            try:
                await self._sleep_future
            except asyncio.CancelledError:
                continue
            finally:
                self._sleep_future = None


This implementation is not reliable and does not account the condition (5) that I seek!

Could you recommend me something?

Upvotes: 1

Views: 690

Answers (1)

user4815162342
user4815162342

Reputation: 154886

The asyncio event loop already contains the code you were trying to implement - ordering timeouts and waiting for tasks to be submitted. You need to adapt the interface of the Scheduler to the underlying asyncio functionality, for example like this:

class Scheduler:
    def __init__(self):
        self._running = asyncio.Lock()

    async def start(self):
        pass  # asyncio event loop will do the actual work

    def add(self, action):
        loop = asyncio.get_event_loop()
        # Can't use `call_at()` because event loop time uses a
        # different timer than time.time().
        loop.call_later(
            action.timestamp - time.time(),
            loop.create_task, self._execute(action)
        )

    async def _execute(self, action):
        # Use a lock to ensure that no two actions run at
        # the same time.
        async with self._running:
            await action.do()

Upvotes: 1

Related Questions