jpo38
jpo38

Reputation: 21544

How to correctly use condition_variable and time_wait/timed_join in threads?

I'm experiencing a wierd behaviour with thread. I isolated the problem in this simple example:

#include <iostream>
#include <boost/thread.hpp>

#define SLEEP_DURATION 1000

static boost::thread m_powerThread;
static boost::condition_variable m_exitThreadCond;
static boost::mutex m_exitThreadMutex;

void ThreadFunc()
{
    boost::mutex::scoped_lock lock( m_exitThreadMutex );
    std::cout << "Thread starts waiting" << std::endl;
    while ( !m_exitThreadCond.timed_wait( lock, boost::posix_time::milliseconds( SLEEP_DURATION ) ) )
    {
        // timed-out: m_exitThreadCond was not notified
        std::cout << "Hello" << std::endl;
    }
    // m_exitThreadCond was notified, let's exit
    std::cout << "Exiting thread" << std::endl;
}

int main( int argc, char* argv[] )
{
    m_powerThread = boost::thread( boost::bind<void>(&ThreadFunc) );

    // make sure thread started for good
    boost::this_thread::sleep( boost::posix_time::milliseconds( 500 ) );

    m_exitThreadMutex.lock();
    m_exitThreadCond.notify_all();
    m_exitThreadMutex.unlock();

    /* Then wait for the thread to exit. */
    if ( m_powerThread.timed_join( boost::posix_time::milliseconds( 2*SLEEP_DURATION ) ) )
    {
        std::cout << "OK, joined thread" << std::endl;
    }
    else
    {
        std::cout << "KO, failed to joined thread" << std::endl;
    }

    return 0;
}

It creates a thread tht will check a condition, if not notified within 1s, it will print "Hello", if condition is notified, thread exits. The main program set's the condition and calls timed_join.

When I run the program, it works fine, output is:

Thread starts waiting
Exiting thread
OK, joined thread

But When I add some breakpoints, it fails. I add a breakpoint to boost::this_thread::sleep( boost::posix_time::milliseconds( 500 ) ); and run the debugger (Visual Studio). It stops at my breakpoint, I step to next instruction, app sleeps for 500ms and I see "Thread starts waiting" in the console, then I step to the notify_all line and later timed_join, but I see timed_join returns false and console outputs:

Hello
Hello
KO, failed to joined threadHello

Hello

Am I using the condition/mutex badly? I undertsand stopping on breakpoints impacts application timing and thread but it should not make it fail in this specific case.

Notes:

Upvotes: 1

Views: 98

Answers (1)

Pepijn Kramer
Pepijn Kramer

Reputation: 13076

Here is what I mean in code, I didn't use boost (because I am not fully sure it works the same way as the std implementation, some names of functions are already a bit different).

#include <iostream>
#include <thread>
#include <chrono>
#include <condition_variable>

//#define SLEEP_DURATION 1000 <-- do NOT use #define

using namespace std::chrono_literals;

// NO GLOBALS, wrap in a class

class PowerFunction final
{
public:
    ~PowerFunction()
    {
        m_thread.join();
    }

    void Start()
    {
        m_thread = std::thread{ [&] { MainLoop(); } };
        WaitForThreadState(ThreadState::Running); // make sure thread started for good
    }

    void Stop()
    {
        SetThreadState(ThreadState::Stopping);
        WaitForThreadState(ThreadState::Stopped);
    }

private:
    // Your thread can be in various states, which will be used
    // in condition variable waits.
    // this will also ensure your startup can be synchronized (see Start)
    enum class ThreadState
    {
        Starting,
        Running,
        Stopping,
        Stopped
    };

    static constexpr std::chrono::steady_clock::duration LoopTime = 1s;

    void MainLoop()
    {
        SetThreadState(ThreadState::Running);
        do
        {
            std::cout << "Hello\n";
        } while (!WaitForThreadState(ThreadState::Stopping, LoopTime));
        std::cout << "Exiting thread\n";
        SetThreadState(ThreadState::Stopped);
    }

    void SetThreadState(const ThreadState state)
    {
        {
            std::unique_lock lock{ m_mtx };
            m_state = state;
        }
        m_cv.notify_all();
    }

    void WaitForThreadState(const ThreadState state)
    {
        std::unique_lock lock{ m_mtx };
        m_cv.wait(lock, [&] { return m_state == state; }); // <-- Wait with predicate
    }

    bool WaitForThreadState(const ThreadState state, const std::chrono::steady_clock::duration& duration)
    {
        std::unique_lock lock{ m_mtx };
        return m_cv.wait_for(lock, duration, [&] { return m_state == state; }); // <-- Wait with predicate AND timeout
    }

    std::thread m_thread;
    std::condition_variable m_cv;
    std::mutex m_mtx;
    ThreadState m_state{ ThreadState::Starting };
};


int main(int argc, char* argv[])
{
    PowerFunction powerFunction;
    powerFunction.Start();

    std::this_thread::sleep_for(5s); // let mainloop do its thing for a while

    powerFunction.Stop();
}

Upvotes: 2

Related Questions