Marcin
Marcin

Reputation: 329

How to safely destruct Posix thread pool in C++

I am trying to implement thread pool in C++ using pthread. I want to encapsulate logic related to threads management in one object which is taking ownership of these threads. That means whenever this object is destroyed, threads must be stopped and cleaned up.

What is the best way to stop and destroy threads? Is detaching at start and cancellation at stop a good solution? Or maybe it is better to cancel and join threads? See my code, I'll appreciate any relevant comments.

WorkerThreadManager.h:

#include "WorkerThreadManagerInterface.h"
#include "utils/mutex.h"
#include <queue>
#include <semaphore.h>

#include <iostream>

class WorkerThreadManager : public WorkerThreadManagerInterface
{
    public:
        WorkerThreadManager(unsigned threadsNumber = 5);
        virtual ~WorkerThreadManager();

        virtual void    PushTask(thread_function_t A_threadFun, result_function_t A_resultFun);
        void    SignalResults();

    private:
        static void*    WorkerThread(void* A_data);

        void    PushResult(int A_result, result_function_t A_resultFun);

        typedef boost::function<void ()> signal_function_t;

        struct worker_thread_data_t
        {
            worker_thread_data_t(thread_function_t A_threadFun, result_function_t A_resultFun) :
                threadFun(A_threadFun), resultFun(A_resultFun) {}
            worker_thread_data_t() {}

            thread_function_t       threadFun;
            result_function_t       resultFun;
        };


        const unsigned                      m_threadsNumber;
        pthread_t*                          m_pthreads;

        utils::Mutex                        m_tasksMutex;
        sem_t                               m_tasksSem;
        std::queue<worker_thread_data_t>    m_tasks;

        utils::Mutex                        m_resultsMutex;
        std::queue<signal_function_t>       m_results;
};

WorkerThreadManager.cpp:

#include "WorkerThreadManager.h"
#include "gateway_log.h"
#include <pthread.h>

/**
 * @brief Creates semaphore and starts threads.
 */
WorkerThreadManager::WorkerThreadManager(unsigned threadsNumber) : m_threadsNumber(threadsNumber)
{
    if ( sem_init(&m_tasksSem, 0, 0) )
    {
        std::stringstream ss;
        ss << "Semaphore could not be initialized: " << errno << " - " << strerror(errno);
        LOG_FATAL(ss);
        throw std::runtime_error(ss.str());
    }

    m_pthreads = new pthread_t[m_threadsNumber];
    for (unsigned i = 0; i < m_threadsNumber; ++i)
    {
        int rc = pthread_create(&m_pthreads[i], NULL, WorkerThreadManager::WorkerThread, (void*) this );
        if(rc)
        {
            std::stringstream ss;
            ss << "Pthread could not be started: " << errno << " - " << strerror(errno);
            LOG_FATAL(ss.str());

            if ( sem_destroy(&m_tasksSem) )
                LOG_ERROR("Semaphore could not be destroyed: " << errno << " - " << strerror(errno));

            delete [] m_pthreads;

            throw std::runtime_error(ss.str());
        }
        else
        {
            LOG_DEBUG("Worker thread started " << m_pthreads[i]);

            if(pthread_detach(m_pthreads[i]))
                LOG_WARN("Failed to detach worker thread");
        }
    }
}

/**
 * @brief Cancels all threads, destroys semaphore
 */
WorkerThreadManager::~WorkerThreadManager()
{
    LOG_DEBUG("~WorkerThreadManager()");

    for(unsigned i = 0; i < m_threadsNumber; ++i)
    {
        if ( pthread_cancel(m_pthreads[i]) )
            LOG_ERROR("Worker thread cancellation failed");
    }

    if ( sem_destroy(&m_tasksSem) )
        LOG_ERROR("Semaphore could not be destroyed: " << errno << " - " << strerror(errno));

    delete [] m_pthreads;
}

/**
 * @brief Adds new task to queue, so worker threads can
 * @param A_threadFun function which will be executed by thread
 * @param A_resultFun function which will be enqueued for calling with return value of A_threadFun as parameter
 *          after worker thread executes A_threadFun.
 */
void WorkerThreadManager::PushTask(thread_function_t A_threadFun, result_function_t A_resultFun)
{
    utils::ScopedLock mutex(m_tasksMutex);

    worker_thread_data_t    data(A_threadFun, A_resultFun);
    m_tasks.push( data );
    sem_post(&m_tasksSem);
    LOG_DEBUG("Task for worker threads has been added to queue");
}

/**
 * @brief   Executes result functions (if there are any) to give feedback 
 *  to classes which requested task execution in worker thread.
 */
void WorkerThreadManager::SignalResults()
{
    while(true)
    {
        signal_function_t signal;
        {
            utils::ScopedLock mutex(m_resultsMutex);
            if(m_results.size())
            {
                signal = m_results.front();
                m_results.pop();
            }
            else
                return;
        }

        signal();
    }
}

/**
 * @brief Enqueues result of function executed in worker thread.
 * @param A_result return value of function executed in worker thread
 * @param A_resultFun function which will be enqueued for calling with A_result as a parameter.
 */
void WorkerThreadManager::PushResult(int A_result, result_function_t A_resultFun)
{
    utils::ScopedLock mutex(m_resultsMutex);

    signal_function_t signal = boost::bind(A_resultFun, A_result);
    m_results.push( signal );
}


/**
 * @brief   worker thread body
 * @param A_data pointer to WorkerThreadManager instance
 */
void* WorkerThreadManager::WorkerThread(void* A_data)
{
    WorkerThreadManager* manager = reinterpret_cast<WorkerThreadManager*>(A_data);
    LOG_DEBUG("Starting worker thread loop");
    while (1)
    {
        if ( -1 == sem_wait(&manager->m_tasksSem) && errno == EINTR )
        {
            LOG_DEBUG("sem_wait interrupted with signal");
            continue;
        }
        LOG_DEBUG("WorkerThread:::::: about to call lock mutex");

        worker_thread_data_t data;
        {
            utils::ScopedLock mutex(manager->m_tasksMutex);
            data = manager->m_tasks.front();
            manager->m_results.pop();
        }

        LOG_DEBUG("WorkerThread:::::: about to call resultFun");
        int result  = data.threadFun();
        LOG_DEBUG("WorkerThread:::::: after call resultFun");
        pthread_testcancel();

        manager->PushResult(result, data.resultFun);
    }

    return NULL;
}

main.cpp:

#include "gateway_log.h"
#include "WorkerThreadManager.h"
#include <memory>

class A {
public:
    int Fun() { LOG_DEBUG("Fun before sleep"); sleep(8); LOG_DEBUG("Fun after sleep");return 0; }
    void Result(int a) { LOG_DEBUG("Result: " << a); }
};


int main()
{
    sd::auto_ptr<WorkerThreadManager> workerThreadManager = new WorkerThreadManager;
    A a;
    workerThreadManager->PushTask(boost::bind(&A::Fun, &a), boost::bind(&A::Result, &a, _1));
    sleep(3);
    LOG_DEBUG("deleting workerThreadManager");
    workerThreadManager.reset();                    // <<<--- CRASH
    LOG_DEBUG("deleted workerThreadManager");
    sleep(10);
    LOG_DEBUG("after sleep");    

    return 0;
}

Please note that there is a problem with this code described here.

Upvotes: 2

Views: 847

Answers (1)

PiotrNycz
PiotrNycz

Reputation: 24402

Regarding safe stop: I prefer pthread_join. I do not use pthread_cancel - I am using special stop message, but I always have event-driven thread (mean thread with some queue of messages). When thread get exit-message it stops its loop and then join returns to my main code.

Regarding your code - I would recommend to create class Thread encapsulating single thread. Pool shall have Thread objects created on heap - like now you have array of pthread_t. IF you need synchronization between pool and threads - then you cannot exit pool destructor without being sure Thread objects are destroyed.

Upvotes: 2

Related Questions