user3854783
user3854783

Reputation:

Boost threads and shared_ptrs

I am working on a side project with friends and am building a thread pool with Boost threads and asio. I already have a basic pool up and running but wanted to be able to delete threads from the pool. The class below keeps a vector of std::shared_ptr<boost::thread>.

The issue I am having is when I run a simple test, the program never exits. It appears to reach the end (see output) but never terminates. I have a feeling it has to do with the threads still being alive and have tried to kill them but to no avail. I did some searches on clearing a vector of shared pointers and it seems like you really shouldn't have to. As well, the thread_group call to join_all() should join and end all threads, at least i thought so.

I am still fully learning boost threads, asio, and shared_ptr's so any help or advice would be greatly appreciated.

Compile:`

g++ -std=c++14 main.cpp -lboost_system -lboost_thread -lpthread

Class:

  class experimentalPool {
      private:
        boost::asio::io_service ioService;
        boost::asio::io_service::work work;
        boost::thread_group threads;
        std::vector<std::shared_ptr<boost::thread>> workers;
        std::size_t poolsize;
      public:
        experimentalPool(): work( ioService ) {
            poolsize = boost::thread::hardware_concurrency();
            for (size_t i = 0; i < poolsize; i++) {
                std::shared_ptr<boost::thread> t1(new boost::thread(
                    boost::bind(&boost::asio::io_service::run, &ioService)));
                threads.add_thread(t1.get());
                workers.push_back(std::move(t1));
            }
        }

        experimentalPool( std::size_t psize )
          : work( ioService ),
            poolsize( psize )
        {
            for (size_t i = 0; i < poolsize; i++) {
                std::shared_ptr<boost::thread> t1(new boost::thread(
                    boost::bind(&boost::asio::io_service::run, &ioService)));
                threads.add_thread(t1.get());
                workers.push_back(std::move(t1));
            }
        }

        template <typename F, typename... Args>
        void add_work(F f, Args... args){
            ioService.post(boost::bind(f, args...));
        };

        ~experimentalPool(){
          ioService.stop();
          try {
            std::cout << "here" << "\n";
            threads.join_all();
            //threads.interrupt_all();
            std::cout << "here" << "\n";
            //for (size_t i = 0; i < workers.size(); i++) {
              //(*workers[i]).interrupt();
            //}
          }
          catch ( const std::exception& ) {std::cout << "Caught Exception" << "\n";}
        }

        std::size_t size() const { return poolsize;}
    };

Main:

void add1(int& num){
    std::cout << ++num << std::endl;
}

int main(){

    int temp = 5;

    experimentalPool xpool(2);
    std::cout << "Xpool " << pool2.size() << "\n";
    xpool.add_work(add1, temp);

    std::this_thread::sleep_for (std::chrono::seconds(1));
    std::cout << "End" << std::endl;
    return 0;
}

Output:

Xpool 1
6
here
xpool here
xpool here
^C
ap@ubuntu:~/Desktop$ 

Upvotes: 1

Views: 904

Answers (2)

sehe
sehe

Reputation: 393709

To be honest I looked at the code for a good few minutes and got more and more concerned.

Why are threads in two collections (thread_group and the vector)? Why are threads owned in another collection? Why is the work object permanent? Why are you axing the service instead of letting the pool drain?

Why is poolsize redundantly stored?

Why is the constructor 99% duplicated?

Why is add1 taking a mutable reference, but the bind is by value?

Simplify

Live On Coliru

#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <thread>

namespace Experimental { 
    namespace ba = boost::asio;

    class Pool {
      private:
        ba::io_service ioService;
        boost::thread_group threads;
        boost::optional<ba::io_service::work> work { ioService };

      public:
        Pool(std::size_t psize = boost::thread::hardware_concurrency()) {
            for (size_t i = 0; i < psize; i++) {
                threads.create_thread(boost::bind(&ba::io_service::run, &ioService));
            }
        }

        template <typename F, typename... Args> void add_work(F f, Args... args) {
            ioService.post(boost::bind(f, args...));
        }

        ~Pool() {
            work.reset();
            threads.join_all();
        }

        std::size_t size() const { return threads.size(); }
    };
}

void add1(int &num) { std::cout << ++num << std::endl; }

int main() {
    int temp = 5;

    Experimental::Pool xpool(2);
    std::cout << "Xpool " << xpool.size() << "\n";
    xpool.add_work(add1, std::ref(temp));

    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout << "End" << std::endl;
}

Prints

Xpool 2
6
End
temp: 6

And it is only ~50% of the code.

Upvotes: 1

llllllllll
llllllllll

Reputation: 16434

You fatal problem is here:

std::shared_ptr<boost::thread> t1(new boost::thread(
    boost::bind(&boost::asio::io_service::run, &ioService)));
threads.add_thread(t1.get());

From the documentation of boost::thread_group(http://www.boost.org/doc/libs/1_42_0/doc/html/thread/thread_management.html#thread.thread_management.threadgroup.destructor), the effect of its destructor is

Destroy *this and delete all boost::thread objects in the group.

Thus boost::thread_group will exclusively participate in the resource management of its threads, you cannot store those threads in a shared_ptr.

To solve it, you need to read the linked documentation for boost::thread_group carefully, use create_thread() and store it as a raw pointer instead.

Upvotes: 0

Related Questions