Maxence Gama
Maxence Gama

Reputation: 11

pthread_cond_broadcast not waking up threads waiting with pthread_cond_wait

I'm just facing a problem with my threads. I'm coding a program that needs to execute asynchronous tasks, and I do it via a thread pool.

To avoid my threads running in the void when they have nothing to do, I block them with pthread_cond_wait then when a new task arrives I wake one up with pthread_cond_signal.

I specify that I have the obligation to use C++98 so I don't have access to the std::thread lib, that's why I use the C functions.

On MacOS everything works perfectly and I haven't detected any problem for the moment.

Here is my class ThreadPool:

#include <queue>
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <signal.h>

class ThreadPool {
public:
    ThreadPool(size_t numThreads);
    ~ThreadPool();

    void    enqueueTask(void (*function)(int, int), int client_fd, int backend_fd);

    void    kill();

private:
    std::vector<pthread_t> workers;
    std::queue<void (*)(int, int)> tasks;
    std::queue<std::pair<int, int> > taskArgs;

    pthread_mutex_t queueMutex;
    pthread_cond_t condition;
    bool stop;

    static void* workerThread(void* arg);
    void run();
};

And here is my implementation:

ThreadPool::ThreadPool(size_t numThreads): stop(false)
{
    pthread_mutex_init(&queueMutex, NULL);
    pthread_cond_init(&condition, NULL);

    for (size_t i = 0; i < numThreads; ++i) {
        pthread_t worker;
        pthread_create(&worker, NULL, workerThread, this);
        workers.push_back(worker);
    }
}

ThreadPool::~ThreadPool()
{
    if (!stop) {
        this->kill();
    }   
}

void    ThreadPool::kill()
{
    pthread_mutex_lock(&queueMutex);
    stop = true;
    pthread_cond_broadcast(&condition);
    pthread_mutex_unlock(&queueMutex);

    for (size_t i = 0; i < workers.size(); ++i) {
        pthread_cancel(workers[i]);
    }

    for (size_t i = 0; i < workers.size(); ++i) {
        pthread_join(workers[i], NULL);
    }

    pthread_mutex_destroy(&queueMutex);
    pthread_cond_destroy(&condition);
}

void    ThreadPool::enqueueTask(void (*function)(int, int), int client_fd, int backend_fd)
{
    pthread_mutex_lock(&queueMutex);
    tasks.push(function);
    taskArgs.push(std::make_pair(client_fd, backend_fd));
    pthread_cond_signal(&condition);
    pthread_mutex_unlock(&queueMutex);
}

void    *ThreadPool::workerThread(void* arg)
{
    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
    pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);

    ThreadPool* pool = (ThreadPool*)arg;
    pool->run();
    return NULL;
}

void    ThreadPool::run()
{
    while (true) {
        pthread_mutex_lock(&queueMutex);

        while (!stop && tasks.empty()) {
            pthread_cond_wait(&condition, &queueMutex);
        }

        if (stop) {
            pthread_mutex_unlock(&queueMutex);
            return;
        }

        void (*task)(int, int) = tasks.front();
        std::pair<int, int> args = taskArgs.front();
        tasks.pop();
        taskArgs.pop();

        pthread_mutex_unlock(&queueMutex);
        
        pthread_testcancel();

        task(args.first, args.second);

        pthread_testcancel();
    }
}

My problem is the following, on linux (at least Ubuntu), when I call my function ThreadPool::kill() to stop all threads pthread_cond_broadcast does not wake up the threads and they remain blocked on pthread_cond_wait making the main thread wait for them to join them with pthread_join blocking the program completely, and the only way to stop it is to send a SIGKILL to the process.

Here is an example:

bool running = true;

static void interruptHandler(int sig_int) {
    (void)sig_int;
    running = false;
}

void task(int a, int b)
{
    size_t i = a + b;
    size_t r = 0;

    for (; i < 1000000; i++)
        r = a + i;
    sleep(1);
    std::cout << "task done: " << r << std::endl;
}

int main(void)
{
    ThreadPool *pool = new ThreadPool(4);

    signal(SIGINT, interruptHandler);

    while (running)
    {
        sleep(1);
        pool->enqueueTask(task, 0, 0);
    }

    pool->kill();
}

When I hit CTRL+C the process becomes completely dead can only be stop sending SIGKILL...

Note that I'm using PTHREAD_CANCEL_DEFERRED because I my threads to end their task before being canceled, but the problem seems not to to come from some kind of blocking task because if i do

int main(void)
{
    ThreadPool *pool = new ThreadPool(4);

    signal(SIGINT, interruptHandler);

    while (running)
        ;

    pool->kill();
} 

the problem is the same.

I do not understand why linux has this behavior and how to deal with it and solve the blocking.

Thank you for your help.

Upvotes: 1

Views: 55

Answers (0)

Related Questions