Reputation: 19331
I am moving a C program using pthreads
over to C++ and will have to make extensive use of the Boost library for the sake of making the program multi-platform, portable, etc.
When working with a thread in the past, my code will usually have the form:
void threadFunc(void* pUserData)
{
if ( !pUserData )
{
return;
}
myStruct* pData = (myStruct*)pUserData;
bool bRun;
lock(pUserData);
bRun = pUserData->bRun;
unlock(pUserData);
while ( bRun )
{
// Do some stuff
// ...
// ...
lock(pUserData);
bRun = pUserData->bRun;
unlock(pUserData);
}
/* Done execution of thread; External thread must have set pUserData->bRun
* to FALSE.
*/
}
This works as I would expect. When I want the thread to shut down, I just lock the mutex for the struct
it's accessing (typically a member of the struct
), toggle the bRun
flag, and join()
on the thread. For boost threads, I've noticed cases with my code where a timed_join()
times out when doing non-blocking operations, similar to this SO question. This leads me to suspect I'm not using boost threads correctly.
First, which of the two general thread structures would be correct for having a thread properly catch the thread_interrupted
exception?
void threadFunc( void* pUserData )
{
while ( 1 )
{
try
{
// Do some stuff
// ...
// ...
boost::this_thread::sleep(boost::posix_time::milliseconds(1));
}
catch(boost::thread_interrupted const& )
{
// Thread interrupted; Clean up
}
}
}
void threadFunc( void* pUserData )
{
try
{
while ( 1 )
{
// Do some stuff
// ...
// ...
boost::this_thread::sleep(boost::posix_time::milliseconds(1));
}
}
catch(boost::thread_interrupted const& )
{
// Thread interrupted; Clean up
}
}
Second, what would be the appropriate boost function to call in place of sleep if I want the thread to have a chance to catch the interrupt call, but not to sleep()
or give up the remainder of the CPU time slice it currently holds? From another SO question on this topic, it seems that the boost::this_thread::interruption_point()
call might be what I'm looking for, but I'm not 100% sure if it will always work, from what I read in the SO question I referenced it from.
Finally, it's to my understanding that not calling any manner of boost sleep()
function or some similar interruption point function in my loop will mean that a timed_join()
will always timeout, and I'll either have to:
Is this assumption correct?
Thank you.
<https://stackoverflow.com/questions/7316633/boost-thread-how-to-acknowledge-interrupt>
, Accessed 2016-01-18.<https://stackoverflow.com/questions/26103648/boostthis-threadinterruption-point-doesnt-throw-boostthread-interrupted>
, Accessed 2016-01-18.Upvotes: 3
Views: 2281
Reputation: 393839
Your code is not exception safe. Therefore you're prone to dead lock when you await the join.
If you receive the exception while holding the mutex, your code will never unlock the mutex, likely leading to deadlock in the waiting threads.
Things to check:
First and foremost, your code doesn't show, anywhere, how you create the threads that you try to interrupt.
You need to be running it on boost::thread
-governed thread instances.
This should be obvious because it will be hard to call boost::thread::interrupt()
without such an object. Still, I'd like to double check this is in order, because the function signature for the thread function strongly suggests native POSIX pthread_create
usage.
A good idea is to check the return value of
interrupt_request()
(which will befalse
for native threads):The
boost::this_thread
interrupt related functions behave in a degraded mode when called from a thread created using the native interface, i.e.boost::this_thread::interruption_enabled()
returns false. As consequence the use ofboost::this_thread::disable_interruption
andboost::this_thread::restore_interruption
will do nothing and calls toboost::this_thread::interruption_point()
will be just ignored.
How are the lock/unlock functions implemented? Again the same goes here. If you mix with POSIX/non-boost APIs then you might miss the signal.
While you're at it, start using lock_guard
or unique_lock
so your code is exception safe.
Regarding your Case A
vs Case B
question, Case B is simplest.
Case A doesn't show how plan to exit the while
loop. You could of course use break
, return
(or even goto
) but as you showed it, it will be an infinite loop.
Apart from sleep()
there is yield()
that expressly gives up the remainder of the timeslice while being an interruption point.
I know you've said you don't want to give up the timeslice, but I'm not sure you realize this is going cause the thread to burn CPU if there's no work to be done:
It strikes me as highly contradictory that you wish to not give up the remainder of the timeslice (indicating low-latency, high bandwidth applications and a lock-free context) and you're talking about using mutex
at the same time. The latter, obviously, is a lot more expensive than merely yield
-ing a partial timeslice, as it's the opposite of lock-free concurrency.
So let me conclude with two demos of good style, one lockful and one lockless (or lock-conservative, depending on the rest of your thread synchronization logic).
This uses the "old" thread control because it's fine when locking, IMO:
#include <boost/thread.hpp>
void trace(std::string const& msg);
struct myStruct {
bool keepRunning = true;
using mutex_t = boost::mutex;
using lock_t = boost::unique_lock<mutex_t>;
lock_t lock() const { return lock_t(mx); }
private:
mutex_t mutable mx;
};
void threadFunc(myStruct &userData) {
trace("threadFunc enter");
do
{
{
auto lock = userData.lock();
if (!userData.keepRunning)
break;
} // destructor of unique_lock unlocks, exception safe
// Do some stuff
// ...
// ...
trace("(work)");
boost::this_thread::sleep_for(boost::chrono::milliseconds(100));
} while (true);
trace("threadFunc exit");
}
int main() {
trace("Starting main");
boost::thread_group threads;
constexpr int N = 4;
std::vector<myStruct> data(N);
for (int i=0; i < N; ++i)
threads.create_thread(boost::bind(threadFunc, boost::ref(data[i])));
boost::this_thread::sleep_for(boost::chrono::seconds(1));
trace("Main signaling shutdown");
for (auto& d : data) {
auto lock = d.lock();
d.keepRunning = false;
}
threads.join_all();
trace("Bye");
}
void trace(std::string const& msg) {
static boost::mutex mx;
boost::lock_guard<boost::mutex> lk(mx);
static int thread_id_gen = 0;
thread_local int thread_id = thread_id_gen++;
std::cout << "Thread #" << thread_id << ": " << msg << "\n";
}
Output e.g.
Thread #0: Starting main
Thread #1: threadFunc enter
Thread #1: (work)
Thread #2: threadFunc enter
Thread #2: (work)
Thread #3: threadFunc enter
Thread #3: (work)
Thread #4: threadFunc enter
Thread #4: (work)
Thread #3: (work)
Thread #1: (work)
Thread #2: (work)
Thread #4: (work)
Thread #1: (work)
Thread #3: (work)
Thread #4: (work)
Thread #2: (work)
Thread #4: (work)
Thread #1: (work)
Thread #3: (work)
Thread #2: (work)
Thread #3: (work)
Thread #4: (work)
Thread #2: (work)
Thread #1: (work)
Thread #2: (work)
Thread #1: (work)
Thread #4: (work)
Thread #3: (work)
Thread #2: (work)
Thread #4: (work)
Thread #1: (work)
Thread #3: (work)
Thread #4: (work)
Thread #2: (work)
Thread #1: (work)
Thread #3: (work)
Thread #4: (work)
Thread #2: (work)
Thread #1: (work)
Thread #3: (work)
Thread #4: (work)
Thread #2: (work)
Thread #1: (work)
Thread #3: (work)
Thread #0: Main signaling shutdown
Thread #4: threadFunc exit
Thread #2: threadFunc exit
Thread #1: threadFunc exit
Thread #3: threadFunc exit
Thread #0: Bye
A literal translation of the previous lockful example:
However, it might make more sense now to have a single, shared, shutdown flag:
#include <boost/thread.hpp>
void trace(std::string const& msg);
struct myStruct {
int value;
};
void threadFunc(myStruct userData, boost::atomic_bool& keepRunning) {
std::string valuestr = std::to_string(userData.value);
trace("threadFunc enter(" + valuestr + ")");
do
{
if (!keepRunning)
break;
// Do some stuff
// ...
// ...
trace("(work" + valuestr + ")");
boost::this_thread::sleep_for(boost::chrono::milliseconds(100));
} while (true);
trace("threadFunc " + valuestr + " exit");
}
int main() {
boost::atomic_bool running { true };
trace("Starting main");
boost::thread_group threads;
constexpr int N = 4;
for (int i=0; i < N; ++i) {
threads.create_thread(
boost::bind(threadFunc, myStruct{i}, boost::ref(running)
));
}
boost::this_thread::sleep_for(boost::chrono::seconds(1));
trace("Main signaling shutdown");
running = false;
threads.join_all();
trace("Bye");
}
void trace(std::string const& msg) {
static boost::mutex mx;
boost::lock_guard<boost::mutex> lk(mx);
static int thread_id_gen = 0;
thread_local int thread_id = thread_id_gen++;
std::cout << "Thread #" << thread_id << ": " << msg << "\n";
}
Note how it's a lot more "natural C++" the way we can just pass data into the threadFunc
using boost::bind
¹
With interruption_point()
Note that the docs for interrupt()
state:
Effects: If
*this
refers to a thread of execution, request that thethread
will be interrupted the next time it enters one of the predefined interruption points with interruption enabled, or if it is currently blocked in a call to one of the predefined interruption points with interruption enabled [emphasis mine]
It's worth noting that interruption can be temporarily disabled. Boost only provides RAII-enabled classes for this, so the default would be exception safe. If your code doesn't use either class disable_interruption
or class restore_interruption
you don't have to worry about this.
#include <boost/thread.hpp>
void trace(std::string const& msg);
struct myStruct {
int value;
};
void threadFunc(myStruct userData) {
std::string valuestr = std::to_string(userData.value);
trace("threadFunc enter(" + valuestr + ")");
// using `Case B` form from your question
try {
do
{
boost::this_thread::sleep_for(boost::chrono::milliseconds(100)); // Avoids huge output for demo
trace("(work" + valuestr + ")");
boost::this_thread::interruption_point();
} while (true);
} catch(boost::thread_interrupted const& tie) {
trace("threadFunc " + valuestr + " interrupted");
}
trace("threadFunc " + valuestr + " exit");
}
int main() {
trace("Starting main");
boost::thread_group threads;
constexpr int N = 4;
for (int i=0; i < N; ++i) {
threads.create_thread(boost::bind(threadFunc, myStruct{i}));
}
boost::this_thread::sleep_for(boost::chrono::seconds(1));
trace("Main signaling shutdown");
threads.interrupt_all();
threads.join_all();
trace("Bye");
}
void trace(std::string const& msg) {
static boost::mutex mx;
boost::lock_guard<boost::mutex> lk(mx);
static int thread_id_gen = 0;
thread_local int thread_id = thread_id_gen++;
std::cout << "Thread #" << thread_id << ": " << msg << "\n";
}
¹ or lambdas if you preferred
Upvotes: 3