user1235183
user1235183

Reputation: 3072

C++ ThreadPool is not running parallel

I've tried to implement a ThreadPool, but unfortunately I'm running into some problems.

This is what I have already.

//includes ...

void call()
{
    std::cout << "Hi i'm thread no " << std::this_thread::get_id() << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(2));
    std::cout << "ready " << std::this_thread::get_id() << std::endl;
};

//Implementation is not shown here to reduce code
class WorkQueue {
    public:
        bool push(std::function<void()> const& value);
        void pop();
        bool empty();
};

std::condition_variable g_queuecheck;
std::mutex              g_lockqueue;
std::atomic<bool>       g_notified;

class ThreadPool
{
    public:
        ThreadPool(int iNoThread) :
            m_noThread(iNoThread)
        {
            g_notified.store(false);
            m_threads.resize(iNoThread);
            bIsReady.store(false);
            for (int i = 0; i < iNoThread; ++i)
                m_threads[i] = std::thread(&ThreadPool::run, this);
        }

        void run()
        {
            while (!bIsReady || !m_workQueue.empty())
            {
                std::unique_lock<std::mutex> locker(g_lockqueue);
                if (m_workQueue.empty())
                {
                    while (!g_notified) // Used to avoid spurious wakeups
                    {
                        g_queuecheck.wait(locker);
                    }
                    if(!bIsReady)
                        g_notified.store(false);
                }

                m_workQueue.pop();
            }
        };

        void addWork(std::function<void()> func)
        {
            m_workQueue.push(func);
            g_notified.store(true);
            g_queuecheck.notify_one();
        }

        void join()
        {
            bIsReady.store(true);
            g_notified.store(true);
            g_queuecheck.notify_all();

            for (int i = 0; i < m_noThread; ++i)
                m_threads[i].join();
        }

        ~ThreadPool()
        {}


        WorkQueue m_workQueue;
        int m_noThread;
        std::vector<std::thread> m_threads;
        std::atomic<bool> bIsReady;
};

int _tmain(int argc, _TCHAR* argv[])
{
    {
        ThreadPool pool(4);

        for (int i = 0; i < 8; ++i)
            pool.addWork(call); //This work is done sequentially

        pool.join();
    }

    std::cin.ignore();
    return 0;
}

My problem is that the work is done sequentially.

  1. How can I fix this?
  2. Is something else wrong with my ThreadPool?
  3. Is the waiting best-practice?

Upvotes: 2

Views: 1524

Answers (3)

DannyK
DannyK

Reputation: 1392

I use boost::asio to implement a thread pool. Hope this helps. This implementation was gleaned from the Asio Thread Pool. The key for me to get the example to work is scoping the asio::io_service::work and have the join_all outside that scope.

#include <boost/chrono.hpp>
#include <boost/thread.hpp>
#include <boost/asio.hpp>
#include <boost/scoped_ptr.hpp>
#include <iostream>


boost::mutex output_mutex;
void call(size_t job_number)
{
    {
       boost::mutex::scoped_lock print_lock(output_mutex);
       std::cout << "Hi i'm job << " << job_number <<" and thread: " << boost::this_thread::get_id() << std::endl;
    }

    boost::this_thread::sleep_for(boost::chrono::seconds(2));
    {
       boost::mutex::scoped_lock print_lock(output_mutex);
       std::cout << "job " << job_number << " finished. thread " << boost::this_thread::get_id() << " ready." << std::endl;
    }
};

int main(int argc, char **argv)
{
    size_t number_of_threads = boost::thread::hardware_concurrency();

    // the number of jobs does not have to equal the number of
    // threads.  they will be processed in turn.
    size_t number_of_jobs = 3 * number_of_threads;

    boost::asio::io_service io_service;
    boost::thread_group threads;
    {
        boost::scoped_ptr< boost::asio::io_service::work > work( new boost::asio::io_service::work(io_service) );
        for(size_t t = 0; t < number_of_threads; t++)
        {
            threads.create_thread(boost::bind(&boost::asio::io_service::run, &io_service));
        }

        // post the jobs for work
        // notice that the thread id is reused
        for( size_t t = 0; t < number_of_jobs; t++ )
        {
            io_service.post(boost::bind(call,t) );
        }
    }
    threads.join_all();

    return 0;
}

Upvotes: 7

Useless
Useless

Reputation: 67852

  1. How can I fix this?

You don't show any work actually being done, only pop being called. Assuming that really executes the function, you should note that your std::unique_lock is still in scope when you do this, so it's synchronized. You need to have released the lock for the calls to be concurrent, which means that locker must be out of scope.

For example,

    { // begin locker scope
        std::unique_lock<std::mutex> locker(g_lockqueue);
        if (m_workQueue.empty())
        {

            while (!g_notified) // used to avoid spurious wakeups 
            {
                g_queuecheck.wait(locker);
            }
            if(!bIsReady)
                g_notified.store(false);
        }
    } // end locker scope
    m_workQueue.pop();

Note that now, your pop method will run the functions in parallel, but it will also be mutating the queue in parallel. This is a problem. You'll need to do something like: move the function into a local variable, and then pop, while holding the mutex; then call the function outside the locker scope.

  1. Is something else wrong with my ThreadPool?

You're using globals, which is ugly, and your variables are poorly named: it isn't at all clear what g_notified and bIsReady are intended to mean.

  1. Is the waiting best-practice?

No. Even apart from the actual bug, its intent is obscure.

Upvotes: 0

alain
alain

Reputation: 12057

The problem is in the run function: The g_lockqueue is locked for too long. It is locked as long as locker is in scope, so g_lockqueue is locked when pop is called.
But, because pop and empty must not be executed concurrently, you need to return the "Work" while locked, then release the lock, and finally execute the work.

run could look like this:

void run()
{
    while (!bIsReady || !m_workQueue.empty())
    {
        Work work;
        {
            std::unique_lock<std::mutex> locker(g_lockqueue);
            if (m_workQueue.empty())
            {

                while (!g_notified) // used to avoid spurious wakeups 
                {
                    g_queuecheck.wait(locker);
                }
                if(!bIsReady)
                    g_notified.store(false);
            }
            work = m_workQueue.pop();  // get the work to be done while locked
        } // g_lockqueue released here
        work.do();  // do the work
    }
};

Upvotes: 0

Related Questions