Anthea
Anthea

Reputation: 3809

Shutdown boost threads correctly

I have x boost threads that work at the same time. One producer thread fills a synchronised queue with calculation tasks. The consumer threads pop out tasks and calculates them.

Synchronised Queue Image Source: https://www.quantnet.com/threads/c-multithreading-in-boost.10028/

The user may finish the programm during this process, so I need to shutdown my threads properly. My current approach seems to not work, since exceptions are thrown. It's intented that on system shutdown all processes should be killed and stop their current task no matter what they do. Could you please show me, how you would kill thoses threads?

Thread Initialisation:

    for (int i = 0; i < numberOfThreads; i++)
    {
        std::thread* thread = new std::thread(&MyManager::worker, this);
        mThreads.push_back(thread);
    }

Thread Destruction:

void MyManager::shutdown()
{
    for (int i = 0; i < numberOfThreads; i++)
    {
        mThreads.at(i)->join();
        delete mThreads.at(i);
    }
    mThreads.clear();
}

Worker:

void MyManager::worker()
{
    while (true)
    {

        int current = waitingList.pop();
        Object * p = objects.at(current);
        p->calculateMesh(); //this task is internally locked by a mutex

        try
        {
            boost::this_thread::interruption_point();
        }
        catch (const boost::thread_interrupted&)
        {
            // Thread interruption request received, break the loop
            std::cout << "- Thread interrupted. Exiting thread." << std::endl;
            break;
        }
    }
}

Synchronised Queue:

#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>

template <typename T>
class ThreadSafeQueue
{
public:

    T pop()
    {
        std::unique_lock<std::mutex> mlock(mutex_);
        while (queue_.empty())
        {
            cond_.wait(mlock);
        }
        auto item = queue_.front();
        queue_.pop();

        return item;
    }

    void push(const T& item)
    {
        std::unique_lock<std::mutex> mlock(mutex_);
        queue_.push(item);
        mlock.unlock();
        cond_.notify_one();
    }


    int sizeIndicator()
    {
        std::unique_lock<std::mutex> mlock(mutex_);
        return queue_.size();
    }


private:

    bool isEmpty() {
        std::unique_lock<std::mutex> mlock(mutex_);
        return queue_.empty();
    }

    std::queue<T> queue_;
    std::mutex mutex_;
    std::condition_variable cond_;
};

The thrown error call stack:

... std::_Mtx_lockX(_Mtx_internal_imp_t * * _Mtx) Line 68   C++
... std::_Mutex_base::lock() Line 42    C++
... std::unique_lock<std::mutex>::unique_lock<std::mutex>(std::mutex & _Mtx) Line 220   C++
... ThreadSafeQueue<int>::pop() Line 13 C++
... MyManager::worker() Zeile 178   C++

Upvotes: 10

Views: 3143

Answers (4)

That Crazy Carl Guy
That Crazy Carl Guy

Reputation: 156

From my experience on working with threads in both Boost and Java, trying to shut down threads externally is always messy. I've never been able to really get that to work cleanly.

The best I've gotten is to have a boolean value available to all the consumer threads that is set to true. When you set it to false, the threads will simply return on their own. In your case, that could easily be put into the while loop you have.

On top of that, you're going to need some synchronization so that you can wait for the threads to return before you delete them, otherwise you can get some hard to define behavior.

An example from a past project of mine:

Thread creation

barrier = new boost::barrier(numOfThreads + 1);
threads = new detail::updater_thread*[numOfThreads];

for (unsigned int t = 0; t < numOfThreads; t++) {
    //This object is just a wrapper class for the boost thread.
    threads[t] = new detail::updater_thread(barrier, this);
}

Thread destruction

for (unsigned int i = 0; i < numOfThreads; i++) {
    threads[i]->requestStop();//Notify all threads to stop.
}

barrier->wait();//The update request will allow the threads to get the message to shutdown.

for (unsigned int i = 0; i < numOfThreads; i++) {
    threads[i]->waitForStop();//Wait for all threads to stop.
    delete threads[i];//Now we are safe to clean up.
}

Some methods that may be of interest from the thread wrapper.

//Constructor
updater_thread::updater_thread(boost::barrier * barrier)
{
   this->barrier = barrier;
   running = true;

   thread = boost::thread(&updater_thread::run, this);
}

void updater_thread::run() {
    while (running) {
        barrier->wait();
        if (!running) break;

        //Do stuff

        barrier->wait();
    }
}

void updater_thread::requestStop() {
    running = false;
}

void updater_thread::waitForStop() {
    thread.join();
}

 

Upvotes: 3

trulstrengerikkealias
trulstrengerikkealias

Reputation: 29

Maybe you are catching the wrong exception class? Which would mean it does not get caught. Not too familiar with threads but is it the mix of std::threads and boost::threads that is causing this?

Try catching the lowest parent exception.

Upvotes: 0

user891558
user891558

Reputation: 23

I think this is a classic problem of reader/writer thread working on a common buffer. One of the most secured way of working out this problem is to use mutexes and signals.( I am not able to post the code here. Please send me an email, I post the code to you).

Upvotes: -2

marom
marom

Reputation: 5230

Try moving 'try' up (like in the sample below). If your thread is waiting for data (inside waitingList.pop()) then may be waiting inside the condition variable .wait(). This is an 'interruption point' and so may throw when the thread gets interrupted.

void MyManager::worker()
{
    while (true)
    {
        try
        {
            int current = waitingList.pop();
            Object * p = objects.at(current);
            p->calculateMesh(); //this task is internally locked by a mutex

            boost::this_thread::interruption_point();
        }
        catch (const boost::thread_interrupted&)
        {
            // Thread interruption request received, break the loop
            std::cout << "- Thread interrupted. Exiting thread." << std::endl;
            break;
        }
    }
}

Upvotes: 0

Related Questions