Epic
Epic

Reputation: 622

How to schedule an operation to run at future time

I have a class TaskManager that holds a queue of tasks. Each time the next task is popped and executed.

class TaskManager
{
    TaskQueue m_queue;

    svc_tasks()
    {
         while (!m_queue.empty())
         {
             Task* task = m_queue.pop();
             task->execute();
         }
    }
};

Inside the Task there are certain points I would like to pause for at least SLEEP_TIME_MS milliseconds. During this pause I would like to start executing the next task. When the pause ends I would like to put the task in the queue again.

class Task
{
    int m_phase = -1;

    execute()
    {
        m_phase++;

        switch(m_phase)
        {
         case 0:
             ...
             do_pause(SLEEP_TIME_MS);
             return;
         case 1:
             ...
             break;
        }
    }
};

Is there a scheduler in std (C++ 17) or boost that I could use that would call a handler function when SLEEP_TIME_MS passes?

Thank you for any advice

Upvotes: 0

Views: 1455

Answers (1)

rafix07
rafix07

Reputation: 20934

You can use boost::asio::high_resolution_timer with its async_wait method.

Every time when you want to schedule the operation of pushing task into queue you have to:

  1. create high_resolution_timer
  2. call expires_after which specifies the expiry time (SLEEP_TIME_MS) i.e. when handler is called. In your case in this handler you push a task into the queue.
  3. call async_wait with your handler

If we assume that execute method returns bool which indicates whether a task is completed (all phases were executed), it may be rewritten into sth like this:

     while (!m_queue.empty()) // this condition should be changed
     {
         Task* task = m_queue.pop();
         bool finished = task->execute();
         if (!finished)
            scheduler works here - start async_wait with handler
     }

If I understand correctly, you want to push task into queue when SLEEP_TIME_MS is expired, so you cannot break loop when queue is empty, because you have to wait until pending tasks will be completion. You can introduce stop flag. And break loop on demand.


Below I put a snippet of code which works in the way you described (I hope):

struct Scheduler {
    Scheduler(boost::asio::io_context& io)
    : io(io) {}

    boost::asio::io_context& io;

    template<class F>
    void schedule (F&& handler) {
        auto timer = std::make_shared<boost::asio::high_resolution_timer>(io);
        timer->expires_after(std::chrono::milliseconds(5000)); // SLEEP_TIME_MS 
        timer->async_wait(
            [timer,handler](const boost::system::error_code& ec) {
                handler();
            });
    }
};

struct Task  {
    int phase = -1;

    bool execute() {
        ++phase;
        std::cout << "phase: " << phase << std::endl;
        if (phase == 0) {
            return false;
        }
        else {

        }
        return true;
    }
};

struct TaskManager {
    Scheduler s;
    std::queue<std::shared_ptr<Task>> tasks;
    std::mutex tasksMtx;
    std::atomic<bool> stop{false};

    TaskManager(boost::asio::io_context& io) : s(io) {
        for (int i = 0; i < 5; ++i)
            tasks.push(std::make_shared<Task>());
    }

    void run() {
        while (true) {
            if (stop)
                break;

            {
                std::lock_guard<std::mutex> lock{tasksMtx};
                if (tasks.empty())
                    continue;
            }

            std::shared_ptr<Task> currTask = tasks.front();
            tasks.pop();

            bool finished = currTask->execute();
            if (!finished)
                s.schedule( [this, currTask](){ insertTaskToVector(std::move(currTask)); } );
        }
    }

    template<class T>
    void insertTaskToVector(T&& t) {
        std::lock_guard<std::mutex> lock{tasksMtx};
        tasks.push(std::forward<T>(t));
    }
};

int main() {
    boost::asio::io_context io;
    boost::asio::io_context::work work{io};
    std::thread th([&io](){ io.run();});
    TaskManager tm(io);
    tm.run();

Upvotes: 1

Related Questions