Reputation: 2365
Until now, when using threads, I've always started them right away in my program and then made them wait on notification from a main control thread.
std::vector<std::thread> threads;
for(int i = 0; i != thread_count; ++i) {
threads.push_back(std::thread(&MyClass::myfunction, this));
}
/* some time later in the code */
for(auto& t : threads) {
t.join();
}
Now I want to start threads on demand from a function run by my control thread, but I'm unsure how to handle the thread objects and their joining.
The following would push a new thread object onto the vector for each call, which strikes me as not ideal:
std::vector<std::thread> threads;
while(accumulating_data) {
if(buffer_full) {
threads.push_back(std::thread(&MyClass::myfunction, this));
}
}
Having the vector hold no more than the maximum number on consecutively running threads seems preferable. I also don't know how to join the threads here without blocking the control thread.
If I do something like this instead:
// dummy code, in my real code I have a queue of idle IDs
std::vector<std::thread> threads(thread_count);
while(accumulating_data) {
if(buffer_full) {
threads[thread_id] = std::thread(&MyClass::myfunction, this);
if(++thread_id == thread_count) { thread_id = 0; }
}
}
...I quickly crash, probably because I haven't joined or am reassigning to a vector element that already contains a std::thread object.
Any hints on how I can accomplish my goal of starting threads on demand, instead of having them wait?
Update:
I managed to get the code to run without crashing by introducing a std::thread.joinable()
check. I'm still open to opinions on how to handle this more elegantly, so I won't make it the answer to my own question:
std::vector<std::thread> threads(thread_count);
while(accumulating_data) {
if(buffer_full) {
if(threads[thread_id].joinable()) {
threads[thread_id].join(); }
}
threads[thread_id] = std::thread(&MyClass::myfunction, this);
if(++thread_id == thread_count) { thread_id = 0; }
}
}
Upvotes: 0
Views: 243
Reputation: 23500
Not sure if this is what you want..
#include <thread>
#include <tuple>
#include <vector>
#include <stdexcept>
class ThreadGroup
{
private:
std::uint32_t max_threads;
std::vector<std::tuple<std::thread::native_handle_type, std::thread, bool*>> data;
public:
ThreadGroup() : max_threads(std::thread::hardware_concurrency()), data(max_threads) {}
ThreadGroup(std::uint32_t max_threads) : max_threads(max_threads), data(max_threads) {}
~ThreadGroup();
template<class Function, class... Args>
std::thread::native_handle_type insert(bool &terminate, Function&& f, Args&&... args);
bool remove(std::thread::native_handle_type id);
};
ThreadGroup::~ThreadGroup()
{
for (auto &it : data)
{
if (std::get<0>(it))
{
if (!*std::get<2>(it))
{
std::get<1>(it).detach();
continue;
}
std::get<1>(it).join();
}
}
}
template<class Function, class... Args>
std::thread::native_handle_type ThreadGroup::insert(bool &terminate, Function&& f, Args&&... args)
{
int i = 0;
for (auto &it : data)
{
if (std::get<0>(it) == 0)
{
auto &&t = std::thread(std::forward<Function>(f), std::forward(args)...);
auto &&tup = std::make_tuple(t.native_handle(), std::forward<std::thread>(t), &terminate);
data[i] = std::move(tup);
return std::get<0>(data[i]);
}
++i;
}
throw std::length_error("Maximum thread limit reached.");
}
bool ThreadGroup::remove(std::thread::native_handle_type id)
{
for (auto it = data.begin(); it != data.end(); ++it)
{
if (std::get<0>(*it) == id)
{
if (std::get<1>(*it).joinable() && *std::get<2>(*it))
{
std::get<1>(*it).join();
std::get<0>(*it) = 0;
std::get<2>(*it) = nullptr;
//data.erase(it);
return true;
}
std::get<1>(*it).detach();
std::get<0>(*it) = 0;
std::get<2>(*it) = nullptr;
//data.erase(it);
return false;
}
}
return false;
}
Then I used it like:
#include <chrono>
#include <iostream>
#include <thread>
bool terminate1 = false, terminate2 = false, terminate3 = false, terminate4 = false, terminate5 = false;
void func1()
{
while(!terminate1)
{
std::cout<<"T1 ";
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
void func2()
{
while(!terminate2)
{
std::cout<<"T2 ";
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
void func3()
{
while(!terminate3)
{
std::cout<<"T3 ";
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
void func4()
{
while(!terminate4)
{
std::cout<<"T4 ";
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
void func5()
{
while(!terminate5)
{
std::cout<<"T5 ";
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
int main()
{
ThreadGroup group;
auto id1 = group.insert(terminate1, func1);
auto id2 = group.insert(terminate2, func2);
auto id3 = group.insert(terminate3, func3);
auto id4 = group.insert(terminate4, func4);
try
{
auto id5 = group.insert(terminate5, func5); //limit in my case is 4. inserting 5 will throw max limit exception..
}
catch(std::exception &e)
{
std::cout<<"\n\n"<<e.what()<<"\n\n";
}
std::this_thread::sleep_for(std::chrono::seconds(3));
terminate1 = true; //allow the thread to join..
group.remove(id1); //joins if the thread is finished..
std::this_thread::sleep_for(std::chrono::seconds(3));
group.remove(id2); //remove another thread (detaches if the thread isn't finished)..
auto id5 = group.insert(terminate5, func5); //insert a new thread in any of the old slots..
std::this_thread::sleep_for(std::chrono::seconds(3));
}
Upvotes: 1
Reputation: 2365
I managed to get the code to run without crashing by introducing a std::thread.joinable()
check. I'm still open to opinions on how to handle this more elegantly :)
std::vector<std::thread> threads(thread_count);
while(accumulating_data) {
if(buffer_full) {
/* the following check returns false prior to an assignment */
if(threads[thread_id].joinable()) {
threads[thread_id].join(); }
}
threads[thread_id] = std::thread(&MyClass::myfunction, this);
if(++thread_id == thread_count) { thread_id = 0; }
}
}
Upvotes: 1