Thomas B.
Thomas B.

Reputation: 721

How to use wait_for in a looping thread?

I want to have a thread (std::thread) that does its work every 60 seconds, otherwise sleeps and instantly returns if the outside requests it. std::jthread is not an option, I'm limited to C++14.

std::condition_variable cv;

//Thread1:
void ThreadWork
{
    while(cv.wait_for(someLock, 60s) == std::cv_status_timeout)
    {
        Work();
    }
    return;
}

//Thread2:
void RequestEnd()
{
    cv.notify_one();
}

The idea here is that if the return val is std::cv_status_timeout, 60s have passed and we do the work normally. Otherwise, return.

Currently, I get some complaints from the runtime about the lock, or just straight up std::terminate.

Doing some tricks with a loop checking for an atomic variable is not what I want; this is about the thread sleeping.

Am I going in the right direction, at least?

Upvotes: 4

Views: 4287

Answers (5)

Thomas B.
Thomas B.

Reputation: 721

std::future<void> seems to do the trick.

// Worker thread setup
void Work(std::future<void> killSwitch)
{
    while(killSwitch.wait_for(60s) == std::future_status::timeout)
    {
        // ... Stuff
    }
}

// Controller thread
std::promise<void> killSwitch;
std::thread worker{ Work, std::move(killSwitch.get_future()) };

// ... Stuff

killSwitch.set_value(); // <-- this will break the loop
worker.join();

No mutexes, condition variables, spurious wakes, locks or atomics.

Upvotes: 5

Pepijn Kramer
Pepijn Kramer

Reputation: 13101

With threading you need to be very precise Your idea of using a condition variable is correct If you try to stop the thread before it started, the condition variable notify is missed and your thread will never stop. Then there are other issues with the condition variable, see my class state_variable. This example is quite detailed but it addresses all those issues and avoids race conditions.

#include <chrono>
#include <iostream>
#include <mutex>
#include <future>
#include <condition_variable>

//-----------------------------------------------------------------------------------------------------
// state of the thread that does the waiting
// used for stable starting and stopping

enum class thread_state
{
    idle,
    starting,
    running,
    stopping,
    stopped
};

//-----------------------------------------------------------------------------------------------------
// helper class for use of std::condition_variable, makes code more readable
// and takes into account the pitfalls of condition variables : 
// https://www.modernescpp.com/index.php/c-core-guidelines-be-aware-of-the-traps-of-condition-variables


template<typename T>
class state_variable
{
public:
    state_variable() = delete;
    state_variable(const state_variable&) = delete;
    state_variable(state_variable&&) = delete;
    state_variable& operator=(const state_variable&) = delete;

    explicit state_variable(const T& value) :
        m_value{ value }
    {
    }

    void operator=(const T& value) noexcept
    {
        {
            std::unique_lock<std::mutex> lock(m_value_mutex);
            m_value = value;
        }
        m_value_changed.notify_all();
    }

    // atomic check and set
    T set_if(const T& from_value, const T& to_value) noexcept
    {
        {
            std::unique_lock<std::mutex> lock(m_value_mutex);
            if (m_value != from_value) return from_value;
            m_value = to_value;
        }
        m_value_changed.notify_all();
        return to_value;
    }

    const bool try_wait_for(const T& value, const std::chrono::steady_clock::duration& duration) const noexcept
    {
        auto pred = [this, value] { return (m_value == value); };
        std::unique_lock<std::mutex> lock(m_value_mutex);
        if (pred()) return true;
        return m_value_changed.wait_for(lock, duration, pred);
    }

    void wait_for(const T& value) const
    {
        try_wait_for(value, std::chrono::steady_clock::duration::max());
    }

private:
    // mutables so I could make the const promises on wait 
    // that they wont change the observable state (m_value)
    // of this class.
    mutable std::mutex m_value_mutex;
    mutable std::condition_variable m_value_changed;
    std::atomic<T> m_value;
};

//-----------------------------------------------------------------------------------------------------

class Worker final
{
public:
    template<typename lambda_t>
    Worker(lambda_t lambda, const std::chrono::steady_clock::duration& loop_time) :
        m_state{ thread_state::idle },
        m_looptime{ loop_time },
        m_work{ lambda }
    {
    };

    Worker(const Worker&) = delete;
    Worker(Worker&&) = delete;
    Worker& operator=(const Worker&) = delete;

    void Start()
    {
        if (m_state.set_if(thread_state::idle, thread_state::starting) != thread_state::starting)
        {
            throw std::runtime_error("only an idle Worker can be started");
        }

        // 
        // Note that std::async, and std::thread don't guarantee the thread is active
        // when the call returns!
        // 
        // it is okay to capture "this" because the destructor of the 
        // Worker synchronizes with termination of this thread through the future
        // So the thread will have a shorter life cycle then the worker!
        //
        m_future = std::async(std::launch::async, [this]
        {
            // Set indication that the thread has really started.
            m_state = thread_state::running;

            do
            {
                m_work();
        
                // using a statevariable to check for stopping means it can respond 
                // during the one second delay and stop immediately. 
                // this is an advantage over using sleep
            } while (!m_state.try_wait_for(thread_state::stopping, m_looptime));

            m_state = thread_state::stopped;
        });

        // Wait for the thread to have really started
        // this way we have a clear post condition for start
        m_state.wait_for(thread_state::running);
    }

    void Stop()
    {
        // only allow a running Worker to be stopped.
        // in all other states Stop does nothing
        if (m_state.set_if(thread_state::running, thread_state::stopping) == thread_state::stopping)
        {
            // synchronization with stopped state, as set by other thread
            m_state.wait_for(thread_state::stopped);

            // future get is not really needed for synchronization.
            // but if thread threw an exception it's rethrown here 
            m_future.get();
        }
    }

    ~Worker()
    {
        // Automatically stop thread if this hasn't already happened.
        Stop();
    }

private:
    std::future<void> m_future;
    state_variable<thread_state> m_state;
    std::chrono::steady_clock::duration m_looptime;
    std::function<void()> m_work;
};


int main()
{
    auto work = []
    {
        std::cout << ".";
    };

    Worker worker(work, std::chrono::seconds(1)); // make 60 for your case and replace work with lambda or std::function<void()>

    std::cout << "Press enter to stop..." << std::endl;
    std::cout << "Every second Work() will be called, and a '.' be printed" << std::endl;
    std::string input;

    worker.Start();
    std::getline(std::cin,input);

    return 0;

    // destructor of worker takes care thread is stopped nicely before exiting the program!
}

Upvotes: 0

BitTickler
BitTickler

Reputation: 11947

My recommendation is, NOT to use condition variables if it can be avoided. They have many pitfalls, and in the end you do not have the general solution for "notifying a thread".

Let me describe the general case, first, so you see, where this is going:

In general, you want to

  • Do something after a fixed amount of time (or a scheduled point in time)
  • Receive application defined "messages" or "events", perhaps put into a queue by other threads
  • Wait for resources, such as socket handles, file handles, ...

A condition variable approach to your subset problem does not scale to when your requirements change and you need more features of the general case.

So, with the risk of getting it wrong with condition variables and the fact, that this approach lacks flexibility and does not account for future changes, I advice to go straight to the general solution.

As I posted in my comment below the question, you might have to use system specific apis or use a library, which does that for you and gives you a portable API. (I do not regularly check, if C++ in its latest mutation has added such a facility.)

Upvotes: 0

Yakk - Adam Nevraumont
Yakk - Adam Nevraumont

Reputation: 275936

std::condition_variable is a low level primitive. Part of its design is that spurious wakeups happen; this means, sometimes people waiting on notifications will get notified even though nobody sent one.

I suspect the reason is a few fold

  1. The underlying mechanism on most OS's has such spurious wakeups. So the std library implementor would have to write the code to handle it if it hid it from the end user.

  2. In almost every use case of std::condition_variable, the code to handle threads moving faster/slower than expected ends up being connected to and overlap with efficient spurious wakeup code. So if the library handled it for you, you'd end up duplicating the work anyhow.

Your next problem is that the logic you have described is a bit vague. Time in a computer should not be treated as absolute; there is no "during the same 60s interval" in two different threads.

There is happens-before some synchronization and happens-after that synchronization.

I suspect you might want a latch. A latch is a synchronization primitive (but less primitive than condition variable). Think of a latch on a door or a gate. It starts off closed, and you can open it; but once you open it, there is no way to close it again.

Here, the latch being "open" means "worker thread, cease your endless toil".

struct latch {
  void open_latch() {
    auto l = lock();
    open = true;
    cv.notify_all();
  }
  void wait() const {
    auto l = lock();
    cv.wait(l, [&]{ return open; });
  }
  template<class Rep, class Period>
  bool wait_for(const std::chrono::duration<Rep, Period>& duration) const {
    auto l = lock();
    return cv.wait_for(l, duration, [&]{ return open; });
  }
  template<class Clock, class Period>
  bool wait_until(const std::chrono::time_point<Clock, Period>& when) const {
    auto l = lock();
    return cv.wait_until(l, when, [&]{ return open; });
  }
private:
  auto lock() const { return std::unique_lock<std::mutex>(m); }
  mutable std::mutex m;
  bool open = false;
  std::condition_variable cv;
};

now your code looks like:

latch l;

Thread1:
void ThreadWork
{
    while(!l.wait_for(60s))
    {
        Work();
    }
    return;
}

Thread2:
void RequestEnd()
{
    l.open_latch();
}

(Code not tested, but this isn't my first rodeo).

There are a bunch of things this pattern handles, including the latch being opened before anyone waits on it.

I'd advise using wait_until instead of wait_for if you want X instances of work to occur after X minutes (note if the work takes more than 1 minute, the waiting will be reduce to near zero time). If you instead want a 1 minute "break" between doing work, use wait_for.

Almost all uses of std::condition_variable has this 3 part system; the mutex, the payload and the condition variable. The mutex should pretty much always guard the payload (even if atomic!) and only the payload. Sometimes the payload is two-part, like an abort flag and a more complex data structure.

Upvotes: 1

Alex Guteniev
Alex Guteniev

Reputation: 13749

It is wrong to use condition_variable without another condition, due to spurious wakes. You need at least a bool variable.

The someLock should protect that other condition, and it should go locked into wait_for.

A lock acquired in a thread should be released in the same thread afterwards. (Usually, though not always, locks are stack variables).

Also dues to spurious wakes non-predicate wait_for is not convenient to use, as you need to recalculate timeout. Prefer predicate form, and check the condition in your predicate.

Upvotes: 4

Related Questions