Adrian
Adrian

Reputation: 2365

How do I handle the repetitive starting of multiple threads?

Until now, when using threads, I've always started them right away in my program and then made them wait on notification from a main control thread.

std::vector<std::thread> threads;
for(int i = 0; i != thread_count; ++i) {
    threads.push_back(std::thread(&MyClass::myfunction, this));
}

/* some time later in the code */
for(auto& t : threads) {
    t.join();
}

Now I want to start threads on demand from a function run by my control thread, but I'm unsure how to handle the thread objects and their joining.

The following would push a new thread object onto the vector for each call, which strikes me as not ideal:

std::vector<std::thread> threads;
while(accumulating_data) {
    if(buffer_full) {
        threads.push_back(std::thread(&MyClass::myfunction, this));
    }
}

Having the vector hold no more than the maximum number on consecutively running threads seems preferable. I also don't know how to join the threads here without blocking the control thread.

If I do something like this instead:

// dummy code, in my real code I have a queue of idle IDs
std::vector<std::thread> threads(thread_count);
while(accumulating_data) {
    if(buffer_full) {
        threads[thread_id] = std::thread(&MyClass::myfunction, this);
        if(++thread_id == thread_count) { thread_id = 0; }
    }
}

...I quickly crash, probably because I haven't joined or am reassigning to a vector element that already contains a std::thread object.

Any hints on how I can accomplish my goal of starting threads on demand, instead of having them wait?

Update:

I managed to get the code to run without crashing by introducing a std::thread.joinable() check. I'm still open to opinions on how to handle this more elegantly, so I won't make it the answer to my own question:

std::vector<std::thread> threads(thread_count);
while(accumulating_data) {
    if(buffer_full) {
        if(threads[thread_id].joinable()) {
            threads[thread_id].join(); }
        }
        threads[thread_id] = std::thread(&MyClass::myfunction, this);
        if(++thread_id == thread_count) { thread_id = 0; }
    }
}

Upvotes: 0

Views: 243

Answers (2)

Brandon
Brandon

Reputation: 23500

Not sure if this is what you want..

#include <thread>
#include <tuple>
#include <vector>
#include <stdexcept>

class ThreadGroup
{
    private:
        std::uint32_t max_threads;
        std::vector<std::tuple<std::thread::native_handle_type, std::thread, bool*>> data;

    public:
        ThreadGroup() : max_threads(std::thread::hardware_concurrency()), data(max_threads) {}
        ThreadGroup(std::uint32_t max_threads) : max_threads(max_threads), data(max_threads) {}
        ~ThreadGroup();

        template<class Function, class... Args>
        std::thread::native_handle_type insert(bool &terminate, Function&& f, Args&&... args);
        bool remove(std::thread::native_handle_type id);
};

ThreadGroup::~ThreadGroup()
{
    for (auto &it : data)
    {
        if (std::get<0>(it))
        {
            if (!*std::get<2>(it))
            {
                std::get<1>(it).detach();
                continue;
            }

            std::get<1>(it).join();
        }
    }
}

template<class Function, class... Args>
std::thread::native_handle_type ThreadGroup::insert(bool &terminate, Function&& f, Args&&... args)
{
    int i = 0;
    for (auto &it : data)
    {
        if (std::get<0>(it) == 0)
        {
            auto &&t = std::thread(std::forward<Function>(f), std::forward(args)...);
            auto &&tup = std::make_tuple(t.native_handle(), std::forward<std::thread>(t), &terminate);
            data[i] = std::move(tup);
            return std::get<0>(data[i]);
        }
        ++i;
    }
    throw std::length_error("Maximum thread limit reached.");
}

bool ThreadGroup::remove(std::thread::native_handle_type id)
{
    for (auto it = data.begin(); it != data.end(); ++it)
    {
        if (std::get<0>(*it) == id)
        {
            if (std::get<1>(*it).joinable() && *std::get<2>(*it))
            {
                std::get<1>(*it).join();
                std::get<0>(*it) = 0;
                std::get<2>(*it) = nullptr;
                //data.erase(it);
                return true;
            }
            std::get<1>(*it).detach();
            std::get<0>(*it) = 0;
            std::get<2>(*it) = nullptr;
            //data.erase(it);
            return false;
        }
    }

    return false;
}

Then I used it like:

#include <chrono>
#include <iostream>
#include <thread>

bool terminate1 = false, terminate2 = false, terminate3 = false, terminate4 = false, terminate5 = false;

void func1()
{
    while(!terminate1)
    {
        std::cout<<"T1 ";
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
}

void func2()
{
    while(!terminate2)
    {
        std::cout<<"T2 ";
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
}

void func3()
{
    while(!terminate3)
    {
        std::cout<<"T3 ";
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
}

void func4()
{
    while(!terminate4)
    {
        std::cout<<"T4 ";
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
}

void func5()
{
    while(!terminate5)
    {
        std::cout<<"T5 ";
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
}

int main()
{
    ThreadGroup group;
    auto id1 = group.insert(terminate1, func1);
    auto id2 = group.insert(terminate2, func2);
    auto id3 = group.insert(terminate3, func3);
    auto id4 = group.insert(terminate4, func4);

    try
    {
        auto id5 = group.insert(terminate5, func5); //limit in my case is 4. inserting 5 will throw max limit exception..
    }
    catch(std::exception &e)
    {
        std::cout<<"\n\n"<<e.what()<<"\n\n";
    }

    std::this_thread::sleep_for(std::chrono::seconds(3));
    terminate1 = true;  //allow the thread to join..
    group.remove(id1);  //joins if the thread is finished..

    std::this_thread::sleep_for(std::chrono::seconds(3));
    group.remove(id2);  //remove another thread (detaches if the thread isn't finished)..

    auto id5 = group.insert(terminate5, func5); //insert a new thread in any of the old slots..
    std::this_thread::sleep_for(std::chrono::seconds(3));
}

Upvotes: 1

Adrian
Adrian

Reputation: 2365

I managed to get the code to run without crashing by introducing a std::thread.joinable() check. I'm still open to opinions on how to handle this more elegantly :)

std::vector<std::thread> threads(thread_count);
while(accumulating_data) {
    if(buffer_full) {
        /* the following check returns false prior to an assignment */
        if(threads[thread_id].joinable()) {
            threads[thread_id].join(); }
        }
        threads[thread_id] = std::thread(&MyClass::myfunction, this);
        if(++thread_id == thread_count) { thread_id = 0; }
    }
}

Upvotes: 1

Related Questions