Reputation: 3072
I've tried to implement a ThreadPool, but unfortunately I'm running into some problems.
This is what I have already.
//includes ...
void call()
{
std::cout << "Hi i'm thread no " << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout << "ready " << std::this_thread::get_id() << std::endl;
};
//Implementation is not shown here to reduce code
class WorkQueue {
public:
bool push(std::function<void()> const& value);
void pop();
bool empty();
};
std::condition_variable g_queuecheck;
std::mutex g_lockqueue;
std::atomic<bool> g_notified;
class ThreadPool
{
public:
ThreadPool(int iNoThread) :
m_noThread(iNoThread)
{
g_notified.store(false);
m_threads.resize(iNoThread);
bIsReady.store(false);
for (int i = 0; i < iNoThread; ++i)
m_threads[i] = std::thread(&ThreadPool::run, this);
}
void run()
{
while (!bIsReady || !m_workQueue.empty())
{
std::unique_lock<std::mutex> locker(g_lockqueue);
if (m_workQueue.empty())
{
while (!g_notified) // Used to avoid spurious wakeups
{
g_queuecheck.wait(locker);
}
if(!bIsReady)
g_notified.store(false);
}
m_workQueue.pop();
}
};
void addWork(std::function<void()> func)
{
m_workQueue.push(func);
g_notified.store(true);
g_queuecheck.notify_one();
}
void join()
{
bIsReady.store(true);
g_notified.store(true);
g_queuecheck.notify_all();
for (int i = 0; i < m_noThread; ++i)
m_threads[i].join();
}
~ThreadPool()
{}
WorkQueue m_workQueue;
int m_noThread;
std::vector<std::thread> m_threads;
std::atomic<bool> bIsReady;
};
int _tmain(int argc, _TCHAR* argv[])
{
{
ThreadPool pool(4);
for (int i = 0; i < 8; ++i)
pool.addWork(call); //This work is done sequentially
pool.join();
}
std::cin.ignore();
return 0;
}
My problem is that the work is done sequentially.
Upvotes: 2
Views: 1524
Reputation: 1392
I use boost::asio to implement a thread pool. Hope this helps. This implementation was gleaned from the Asio Thread Pool. The key for me to get the example to work is scoping the asio::io_service::work and have the join_all outside that scope.
#include <boost/chrono.hpp>
#include <boost/thread.hpp>
#include <boost/asio.hpp>
#include <boost/scoped_ptr.hpp>
#include <iostream>
boost::mutex output_mutex;
void call(size_t job_number)
{
{
boost::mutex::scoped_lock print_lock(output_mutex);
std::cout << "Hi i'm job << " << job_number <<" and thread: " << boost::this_thread::get_id() << std::endl;
}
boost::this_thread::sleep_for(boost::chrono::seconds(2));
{
boost::mutex::scoped_lock print_lock(output_mutex);
std::cout << "job " << job_number << " finished. thread " << boost::this_thread::get_id() << " ready." << std::endl;
}
};
int main(int argc, char **argv)
{
size_t number_of_threads = boost::thread::hardware_concurrency();
// the number of jobs does not have to equal the number of
// threads. they will be processed in turn.
size_t number_of_jobs = 3 * number_of_threads;
boost::asio::io_service io_service;
boost::thread_group threads;
{
boost::scoped_ptr< boost::asio::io_service::work > work( new boost::asio::io_service::work(io_service) );
for(size_t t = 0; t < number_of_threads; t++)
{
threads.create_thread(boost::bind(&boost::asio::io_service::run, &io_service));
}
// post the jobs for work
// notice that the thread id is reused
for( size_t t = 0; t < number_of_jobs; t++ )
{
io_service.post(boost::bind(call,t) );
}
}
threads.join_all();
return 0;
}
Upvotes: 7
Reputation: 67852
- How can I fix this?
You don't show any work actually being done, only pop
being called. Assuming that really executes the function, you should note that your std::unique_lock
is still in scope when you do this, so it's synchronized. You need to have released the lock for the calls to be concurrent, which means that locker must be out of scope.
For example,
{ // begin locker scope
std::unique_lock<std::mutex> locker(g_lockqueue);
if (m_workQueue.empty())
{
while (!g_notified) // used to avoid spurious wakeups
{
g_queuecheck.wait(locker);
}
if(!bIsReady)
g_notified.store(false);
}
} // end locker scope
m_workQueue.pop();
Note that now, your pop method will run the functions in parallel, but it will also be mutating the queue in parallel. This is a problem. You'll need to do something like: move the function into a local variable, and then pop, while holding the mutex; then call the function outside the locker scope.
- Is something else wrong with my ThreadPool?
You're using globals, which is ugly, and your variables are poorly named: it isn't at all clear what g_notified
and bIsReady
are intended to mean.
- Is the waiting best-practice?
No. Even apart from the actual bug, its intent is obscure.
Upvotes: 0
Reputation: 12057
The problem is in the run
function: The g_lockqueue
is locked for too long. It is locked as long as locker
is in scope, so g_lockqueue
is locked when pop
is called.
But, because pop
and empty
must not be executed concurrently, you need to return the "Work" while locked, then release the lock, and finally execute the work.
run
could look like this:
void run()
{
while (!bIsReady || !m_workQueue.empty())
{
Work work;
{
std::unique_lock<std::mutex> locker(g_lockqueue);
if (m_workQueue.empty())
{
while (!g_notified) // used to avoid spurious wakeups
{
g_queuecheck.wait(locker);
}
if(!bIsReady)
g_notified.store(false);
}
work = m_workQueue.pop(); // get the work to be done while locked
} // g_lockqueue released here
work.do(); // do the work
}
};
Upvotes: 0