shantanu
shantanu

Reputation: 2418

Conditional wait of pthread never returns in c++

I am trying to implement a queue based worker using pthread. But I have some confusions about pthread_cond_wait().

class Worker

class Worker {
    private:
        pthread_t thread;
        vector<int> queue;
        bool stop;
        pthread_mutex_t mutex;
        pthread_cond_t cond;
    public:
        Worker() {
            stop = false;
            if (pthread_mutex_init(&mutex, NULL) != 0)
            {
                printf("\n mutex init failed\n");
            }

            if(pthread_cond_init(&cond,NULL) != 0){
                printf("\n cond init failed\n");
            }
        }
        ~Worker() {
            pthread_mutex_destroy(&mutex);
            pthread_cond_destroy(&cond);
        }
        void interrupt(){
            printf("Going to inturrupt\n");
            pthread_mutex_lock(&mutex);
            pthread_cond_signal(&cond); //broadcast also doesn't work
            pthread_mutex_unlock(&mutex);
            printf("inturrupted \n");
        }
        void condition_lock(){
            pthread_mutex_lock(&mutex);
            while(queue.size() == 0){
                printf("Entering conditional lock\n");
                pthread_cond_wait(&cond,&mutex);
            }
            pthread_mutex_unlock(&mutex);
        }
        void *run(){
            printf("run\n");
            while(!stop){
                printf("Going for condition lock");
                printf("size: %d\n",queue.size());
                condition_lock();
                printf("Exit from condition lock");
                while(queue.size() > 0){
                    printf("item: %d\n",queue[0]);
                    queue.pop_back();
                }
            }
            pthread_exit(NULL);
        }
        void push(int value){
           pthread_mutex_lock(&mutex);
           queue.push_back(value);
           pthread_mutex_unlock(&mutex);
        }
        void join(){
            void *status;
            pthread_join(thread,&status);
        }
        static void *run_helper(void* context){
            return ((Worker *)context)->run();
        }
        void stop_thread(){
            stop = true;
            interrupt();
        }
        void start_thread(Worker worker){
            stop = false;
            int status = pthread_create(&thread,NULL,run_helper,&worker);
        }
};

Main

int main(){
    Worker worker;
    worker.start_thread(worker);
    usleep(500000);
    for(int i=0;i<5;i++){
        worker.push(i);
        worker.interrupt();
        usleep(500000);
    }
    worker.stop_thread();
    worker.join();
    printf("Thread exit\n");
    return 0;
}

output

run
Going for condition locksize: 0
Entering conditional lock
Going to inturrupt
inturrupted 
Going to inturrupt
inturrupted 
Going to inturrupt
inturrupted 
Going to inturrupt
inturrupted 
Going to inturrupt
inturrupted 
Going to inturrupt
inturrupted 

It's never return from pthread_cond_wait(). I also don't understand how pthread_mutex_lock() is working in void interrupt() method because it should be already locked by void condition_lock().

edit

I have changed two changes in code as suggested.

1. use queue.size() == 0 instead of conditional variable. 
2. Use mutex lock/unlock during queue.push_back()

Upvotes: 3

Views: 735

Answers (2)

Loki Astari
Loki Astari

Reputation: 264729

Bug here:

    void start_thread(Worker worker){ // worker passed by value
                                      // thus it is a copy.
        stop = false;
        int status = pthread_create(&thread,NULL,
                         run_helper,&worker); // Address of worker passed to thread.
    }   // worker destroyed here.

You are passing the worker in by value (thus getting a copy). The thread is running against this copy. But the copy was destroyed on exit of this function (thus mutex and cond are not valid).

Since this and worker should be the same thing.

Fix with:

     void start_thread(){
          stop = false;
          int status = pthread_create(&thread, NULL, run_helper, this);
      }

This is wrong:

   void condition_lock(bool condition){
        pthread_mutex_lock(&mutex);
        if(condition){
    //  ^^^^ Should be while(<validate some invariant>)
            printf("Entering conditional lock\n");
            pthread_cond_wait(&cond,&mutex);
        }
        pthread_mutex_unlock(&mutex);
    }

You really want to pass a function through here. So that the condition variable can validate the condition each time it is revived.

You are mutating the state of the obect without aquiring the lock.

    void push(int value){
       queue.push_back(value);
    }

There are two threads running through this class. Whenever the state is being modified you need to acquire a lock. This is done in several methods (even modiying stop should be done under a lock).

Technically this is not the valid target of C function callback.

    static void *run_helper(void* context){
        return ((Worker *)context)->run();
    }

C does not know C++ ABI. pthreads is a C library so the only valid pointer you can pass as a callback is a C function.

I also don't understand how pthread_mutex_lock() is working in void interrupt()

   pthread_mutex_lock(&mutex);
   pthread_cond_wait(&cond,&mutex);  The call to wait releases the lock
                                     on the mutex. When the thread is woken
                                     up it must reaquire the lock before
                                     the thread exits the call pthread_cond_wait()
                                     This allows another thread to lock the
                                     mutex modify state then call the signal
                                     mrthod before releasing the lock.
                                     this allows interupt() to run as expected.

Note: Just because you call signal does not mean the other thread is immediately scheduled for execution (it just becomes available). Your code is in such a tight loop that it may be an issue aquiring the lock it requires to exit the pthread_cond_wait() function.

Fixed Code:

Though I left the boring fixes that still need to be done for you. You MUST check the result of all library calls to validate that they worked. If they don't work then the very least you can do is throw an exception.

#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <vector>
#include <iostream>

using namespace std;

// LA: Callback must by C function.
extern "C" void *run_helper(void* context);
class Worker {
    private:
        pthread_t thread;
        vector<int> queue;
        bool stop;
        pthread_mutex_t mutex;
        pthread_cond_t cond;
    public:
        Worker() {
            stop = false;
            if (pthread_mutex_init(&mutex, NULL) != 0)
            {
                printf("\n mutex init failed\n");
            }

            if(pthread_cond_init(&cond,NULL) != 0){
                printf("\n cond init failed\n");
            }
        }
        ~Worker() {
            pthread_mutex_destroy(&mutex);
            pthread_cond_destroy(&cond);
        }
        void interrupt(){
            printf("Going to inturrupt\n");
            pthread_mutex_lock(&mutex);
            pthread_cond_signal(&cond); //broadcast also doesn't work
            pthread_mutex_unlock(&mutex);
            printf("inturrupted \n");
        }

        void *run(){
            printf("run\n");
            pthread_mutex_lock(&mutex);
            while(!stop){
                printf("Going for condition lock\n");
                printf("size: %lu\n",queue.size());



                // LA: Moved condition_lock() inline.
                //     This was because we needed the lock around
                //     accessing the state after the wait
                // LA: Check queue size and if we are stopped after being woken
                while(queue.size() == 0 && !stop){
                    printf("Entering conditional lock\n");
                    pthread_cond_wait(&cond,&mutex);
                }
                printf("Exit from condition lock\n");
                while(queue.size() > 0){
                    printf("item: %d\n",queue[0]);
                    queue.pop_back();
                }
            }
            pthread_mutex_unlock(&mutex);
            pthread_exit(NULL);
        }
        void push(int value){
            // LA: All state mutation needs to be guarded.
            pthread_mutex_lock(&mutex);
           queue.push_back(value);
            pthread_mutex_unlock(&mutex);
        }
        void join(){
            void *status;
            pthread_join(thread,&status);
        }
        void stop_thread(){
            // LA: All state mutation needs to be guarded.
            pthread_mutex_lock(&mutex);
            stop = true;
            pthread_mutex_unlock(&mutex);
            interrupt();
        }
        void start_thread(){
            int status = pthread_create(&thread,NULL,run_helper,this);
        }
};

extern "C" void *run_helper(void* context){
    return ((Worker *)context)->run();
}


int main(){
    Worker worker;
    worker.start_thread();
    usleep(500000);
    for(int i=0;i<5;i++){
        worker.push(i);
        worker.interrupt();
        usleep(500000);
    }
    worker.stop_thread();
    worker.join();
    printf("Thread exit\n");
    return 0;
}

Upvotes: 1

Lightness Races in Orbit
Lightness Races in Orbit

Reputation: 385405

I also don't understand how pthread_mutex_lock() is working in void interrupt().

It's not. It's causing a deadlock.

it should be already locked by void condition_lock().

It is. That's why:

It's never return from pthread_cond_wait().

Upvotes: -2

Related Questions