Reputation: 329
I am trying to implement thread pool in C++ using pthread. I want to encapsulate logic related to threads management in one object which is taking ownership of these threads. That means whenever this object is destroyed, threads must be stopped and cleaned up.
What is the best way to stop and destroy threads? Is detaching at start and cancellation at stop a good solution? Or maybe it is better to cancel and join threads? See my code, I'll appreciate any relevant comments.
WorkerThreadManager.h:
#include "WorkerThreadManagerInterface.h"
#include "utils/mutex.h"
#include <queue>
#include <semaphore.h>
#include <iostream>
class WorkerThreadManager : public WorkerThreadManagerInterface
{
public:
WorkerThreadManager(unsigned threadsNumber = 5);
virtual ~WorkerThreadManager();
virtual void PushTask(thread_function_t A_threadFun, result_function_t A_resultFun);
void SignalResults();
private:
static void* WorkerThread(void* A_data);
void PushResult(int A_result, result_function_t A_resultFun);
typedef boost::function<void ()> signal_function_t;
struct worker_thread_data_t
{
worker_thread_data_t(thread_function_t A_threadFun, result_function_t A_resultFun) :
threadFun(A_threadFun), resultFun(A_resultFun) {}
worker_thread_data_t() {}
thread_function_t threadFun;
result_function_t resultFun;
};
const unsigned m_threadsNumber;
pthread_t* m_pthreads;
utils::Mutex m_tasksMutex;
sem_t m_tasksSem;
std::queue<worker_thread_data_t> m_tasks;
utils::Mutex m_resultsMutex;
std::queue<signal_function_t> m_results;
};
WorkerThreadManager.cpp:
#include "WorkerThreadManager.h"
#include "gateway_log.h"
#include <pthread.h>
/**
* @brief Creates semaphore and starts threads.
*/
WorkerThreadManager::WorkerThreadManager(unsigned threadsNumber) : m_threadsNumber(threadsNumber)
{
if ( sem_init(&m_tasksSem, 0, 0) )
{
std::stringstream ss;
ss << "Semaphore could not be initialized: " << errno << " - " << strerror(errno);
LOG_FATAL(ss);
throw std::runtime_error(ss.str());
}
m_pthreads = new pthread_t[m_threadsNumber];
for (unsigned i = 0; i < m_threadsNumber; ++i)
{
int rc = pthread_create(&m_pthreads[i], NULL, WorkerThreadManager::WorkerThread, (void*) this );
if(rc)
{
std::stringstream ss;
ss << "Pthread could not be started: " << errno << " - " << strerror(errno);
LOG_FATAL(ss.str());
if ( sem_destroy(&m_tasksSem) )
LOG_ERROR("Semaphore could not be destroyed: " << errno << " - " << strerror(errno));
delete [] m_pthreads;
throw std::runtime_error(ss.str());
}
else
{
LOG_DEBUG("Worker thread started " << m_pthreads[i]);
if(pthread_detach(m_pthreads[i]))
LOG_WARN("Failed to detach worker thread");
}
}
}
/**
* @brief Cancels all threads, destroys semaphore
*/
WorkerThreadManager::~WorkerThreadManager()
{
LOG_DEBUG("~WorkerThreadManager()");
for(unsigned i = 0; i < m_threadsNumber; ++i)
{
if ( pthread_cancel(m_pthreads[i]) )
LOG_ERROR("Worker thread cancellation failed");
}
if ( sem_destroy(&m_tasksSem) )
LOG_ERROR("Semaphore could not be destroyed: " << errno << " - " << strerror(errno));
delete [] m_pthreads;
}
/**
* @brief Adds new task to queue, so worker threads can
* @param A_threadFun function which will be executed by thread
* @param A_resultFun function which will be enqueued for calling with return value of A_threadFun as parameter
* after worker thread executes A_threadFun.
*/
void WorkerThreadManager::PushTask(thread_function_t A_threadFun, result_function_t A_resultFun)
{
utils::ScopedLock mutex(m_tasksMutex);
worker_thread_data_t data(A_threadFun, A_resultFun);
m_tasks.push( data );
sem_post(&m_tasksSem);
LOG_DEBUG("Task for worker threads has been added to queue");
}
/**
* @brief Executes result functions (if there are any) to give feedback
* to classes which requested task execution in worker thread.
*/
void WorkerThreadManager::SignalResults()
{
while(true)
{
signal_function_t signal;
{
utils::ScopedLock mutex(m_resultsMutex);
if(m_results.size())
{
signal = m_results.front();
m_results.pop();
}
else
return;
}
signal();
}
}
/**
* @brief Enqueues result of function executed in worker thread.
* @param A_result return value of function executed in worker thread
* @param A_resultFun function which will be enqueued for calling with A_result as a parameter.
*/
void WorkerThreadManager::PushResult(int A_result, result_function_t A_resultFun)
{
utils::ScopedLock mutex(m_resultsMutex);
signal_function_t signal = boost::bind(A_resultFun, A_result);
m_results.push( signal );
}
/**
* @brief worker thread body
* @param A_data pointer to WorkerThreadManager instance
*/
void* WorkerThreadManager::WorkerThread(void* A_data)
{
WorkerThreadManager* manager = reinterpret_cast<WorkerThreadManager*>(A_data);
LOG_DEBUG("Starting worker thread loop");
while (1)
{
if ( -1 == sem_wait(&manager->m_tasksSem) && errno == EINTR )
{
LOG_DEBUG("sem_wait interrupted with signal");
continue;
}
LOG_DEBUG("WorkerThread:::::: about to call lock mutex");
worker_thread_data_t data;
{
utils::ScopedLock mutex(manager->m_tasksMutex);
data = manager->m_tasks.front();
manager->m_results.pop();
}
LOG_DEBUG("WorkerThread:::::: about to call resultFun");
int result = data.threadFun();
LOG_DEBUG("WorkerThread:::::: after call resultFun");
pthread_testcancel();
manager->PushResult(result, data.resultFun);
}
return NULL;
}
main.cpp:
#include "gateway_log.h"
#include "WorkerThreadManager.h"
#include <memory>
class A {
public:
int Fun() { LOG_DEBUG("Fun before sleep"); sleep(8); LOG_DEBUG("Fun after sleep");return 0; }
void Result(int a) { LOG_DEBUG("Result: " << a); }
};
int main()
{
sd::auto_ptr<WorkerThreadManager> workerThreadManager = new WorkerThreadManager;
A a;
workerThreadManager->PushTask(boost::bind(&A::Fun, &a), boost::bind(&A::Result, &a, _1));
sleep(3);
LOG_DEBUG("deleting workerThreadManager");
workerThreadManager.reset(); // <<<--- CRASH
LOG_DEBUG("deleted workerThreadManager");
sleep(10);
LOG_DEBUG("after sleep");
return 0;
}
Please note that there is a problem with this code described here.
Upvotes: 2
Views: 847
Reputation: 24402
Regarding safe stop: I prefer pthread_join
. I do not use pthread_cancel
- I am using special stop message, but I always have event-driven thread (mean thread with some queue of messages). When thread get exit-message
it stops its loop and then join returns to my main
code.
Regarding your code - I would recommend to create class Thread
encapsulating single thread. Pool shall have Thread
objects created on heap - like now you have array of pthread_t
. IF you need synchronization between pool and threads - then you cannot exit pool destructor without being sure Thread
objects are destroyed.
Upvotes: 2