Reputation: 3809
I have x boost threads that work at the same time. One producer thread fills a synchronised queue with calculation tasks. The consumer threads pop out tasks and calculates them.
Image Source: https://www.quantnet.com/threads/c-multithreading-in-boost.10028/
The user may finish the programm during this process, so I need to shutdown my threads properly. My current approach seems to not work, since exceptions are thrown. It's intented that on system shutdown all processes should be killed and stop their current task no matter what they do. Could you please show me, how you would kill thoses threads?
Thread Initialisation:
for (int i = 0; i < numberOfThreads; i++)
{
std::thread* thread = new std::thread(&MyManager::worker, this);
mThreads.push_back(thread);
}
Thread Destruction:
void MyManager::shutdown()
{
for (int i = 0; i < numberOfThreads; i++)
{
mThreads.at(i)->join();
delete mThreads.at(i);
}
mThreads.clear();
}
Worker:
void MyManager::worker()
{
while (true)
{
int current = waitingList.pop();
Object * p = objects.at(current);
p->calculateMesh(); //this task is internally locked by a mutex
try
{
boost::this_thread::interruption_point();
}
catch (const boost::thread_interrupted&)
{
// Thread interruption request received, break the loop
std::cout << "- Thread interrupted. Exiting thread." << std::endl;
break;
}
}
}
Synchronised Queue:
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
template <typename T>
class ThreadSafeQueue
{
public:
T pop()
{
std::unique_lock<std::mutex> mlock(mutex_);
while (queue_.empty())
{
cond_.wait(mlock);
}
auto item = queue_.front();
queue_.pop();
return item;
}
void push(const T& item)
{
std::unique_lock<std::mutex> mlock(mutex_);
queue_.push(item);
mlock.unlock();
cond_.notify_one();
}
int sizeIndicator()
{
std::unique_lock<std::mutex> mlock(mutex_);
return queue_.size();
}
private:
bool isEmpty() {
std::unique_lock<std::mutex> mlock(mutex_);
return queue_.empty();
}
std::queue<T> queue_;
std::mutex mutex_;
std::condition_variable cond_;
};
The thrown error call stack:
... std::_Mtx_lockX(_Mtx_internal_imp_t * * _Mtx) Line 68 C++
... std::_Mutex_base::lock() Line 42 C++
... std::unique_lock<std::mutex>::unique_lock<std::mutex>(std::mutex & _Mtx) Line 220 C++
... ThreadSafeQueue<int>::pop() Line 13 C++
... MyManager::worker() Zeile 178 C++
Upvotes: 10
Views: 3143
Reputation: 156
From my experience on working with threads in both Boost and Java, trying to shut down threads externally is always messy. I've never been able to really get that to work cleanly.
The best I've gotten is to have a boolean value available to all the consumer threads that is set to true. When you set it to false, the threads will simply return on their own. In your case, that could easily be put into the while loop you have.
On top of that, you're going to need some synchronization so that you can wait for the threads to return before you delete them, otherwise you can get some hard to define behavior.
An example from a past project of mine:
Thread creation
barrier = new boost::barrier(numOfThreads + 1);
threads = new detail::updater_thread*[numOfThreads];
for (unsigned int t = 0; t < numOfThreads; t++) {
//This object is just a wrapper class for the boost thread.
threads[t] = new detail::updater_thread(barrier, this);
}
Thread destruction
for (unsigned int i = 0; i < numOfThreads; i++) {
threads[i]->requestStop();//Notify all threads to stop.
}
barrier->wait();//The update request will allow the threads to get the message to shutdown.
for (unsigned int i = 0; i < numOfThreads; i++) {
threads[i]->waitForStop();//Wait for all threads to stop.
delete threads[i];//Now we are safe to clean up.
}
Some methods that may be of interest from the thread wrapper.
//Constructor
updater_thread::updater_thread(boost::barrier * barrier)
{
this->barrier = barrier;
running = true;
thread = boost::thread(&updater_thread::run, this);
}
void updater_thread::run() {
while (running) {
barrier->wait();
if (!running) break;
//Do stuff
barrier->wait();
}
}
void updater_thread::requestStop() {
running = false;
}
void updater_thread::waitForStop() {
thread.join();
}
Upvotes: 3
Reputation: 29
Maybe you are catching the wrong exception class? Which would mean it does not get caught. Not too familiar with threads but is it the mix of std::threads and boost::threads that is causing this?
Try catching the lowest parent exception.
Upvotes: 0
Reputation: 23
I think this is a classic problem of reader/writer thread working on a common buffer. One of the most secured way of working out this problem is to use mutexes and signals.( I am not able to post the code here. Please send me an email, I post the code to you).
Upvotes: -2
Reputation: 5230
Try moving 'try' up (like in the sample below). If your thread is waiting for data (inside waitingList.pop()) then may be waiting inside the condition variable .wait(). This is an 'interruption point' and so may throw when the thread gets interrupted.
void MyManager::worker()
{
while (true)
{
try
{
int current = waitingList.pop();
Object * p = objects.at(current);
p->calculateMesh(); //this task is internally locked by a mutex
boost::this_thread::interruption_point();
}
catch (const boost::thread_interrupted&)
{
// Thread interruption request received, break the loop
std::cout << "- Thread interrupted. Exiting thread." << std::endl;
break;
}
}
}
Upvotes: 0