the_martian
the_martian

Reputation: 634

Trying to understand pthread_cond_lock and pthread_cond_signal

So I'm trying to understand exactly how pthread_mutex_lock works.

My current understanding is that it unlocks the mutex and puts whatever thread is going though it to sleep. Sleep meaning that the thread is inactive and consuming no resources.

It then waits for a signal to go from asleep to blocked, meaning that the thread can no longer change any variables.

thread 1:
    pthread_mutex_lock(&mutex);
    while (!condition){
        printf("Thread wating.\n");
        pthread_cond_wait(&cond, &mutex);
        printf("Thread awakened.\n");
        fflush(stdout);
   }
   pthread_mutex_unlock(&mutex);

   pthread_cond_signal(&condVar);
   pthread_mutex_unlock(&mutex);

So basically in the sample above, the loop runs and runs and each iteration pthread_cond_wait checks if the condition of the loop is true. If it is then the cond_signal is sent and the thread is blocked so it can't manipulate any more data.

I'm really having trouble wrapping my head around this, I'd appreciate some input and feedback about how this works and whether or not I am beginning to understand this based on what I have above.

I've gone over this post but am still having trouble

Upvotes: 1

Views: 1454

Answers (2)

Mecki
Mecki

Reputation: 133079

pthread_cond_wait() simply means that the current thread shall release the mutex and then waits on a condition. The trick here is that both happens atomically, so it cannot happen, that the thread has released the mutex and is not yet waiting on the condition or is already waiting on the condition and has not yet released the mutex. Either both has happened or none has happened.

pthread_cond_signal() simply wakes up any thread that is currently waiting on the signaled condition. The first thing the woken up thread will do is obtaining the mutex again, if it cannot obtain it (e.g. as the signaling thread is currently owning the mutex), it will block until it can. If multiple threads are waiting on the condition, pthread_cond_signal() just wakes up one of them, which one is not defined. If you want to wake up all the waiting threads, you must use pthread_cond_broadcast() instead; but of course they won't run at the same time as now each of them first requires to obtain the mutex and that will only be possible one after another.

pthread_cond_t has no state. If you signal a condition no thread is waiting for, then nothing will happen. It's not like this will set a flag internally and if later on some thread calls pthread_cond_wait(), it will be woken up immediately as there is a pending signal. pthread_cond_signal() only wakes up threads that are already waiting, that means these threads must have called pthread_cond_wait() prior to you calling pthread_cond_signal().

Here's some simple sample code. First a reader thread:

// === Thread 1 ===

// We want to process an item from a list.
// To make sure the list is not altered by one
// thread while another thread is accessing it,
// it is protected by a mutex.
pthread_mutex_lock(&listLock);

// Now nobody but us is allowed to access the list.
// But what if the list is empty?
while (list->count == 0) {
    // As long as we hold the mutex, no other thread
    // thread can add anything to the list. So we
    // must release it. But we want to know as soon
    // as another thread has changed it.
    pthread_cond_wait(&listCondition, &listLock);

    // When we get here, somebody has signaled the
    // condition and we have the mutex again and
    // thus are allowed to access the list. The list
    // may however still be empty, as another thread
    // may have already consumed the new item in case
    // there are multiple readers and all are woken 
    // up, thus the while-loop. If the list is still
    // empty, we just go back to sleep and wait again.
}

// If we get here, the list is not empty.
processListItem(list);

// Finally we release the mutex again.
pthread_mutex_unlock(&listLock);

And then a writer thread:

// === Thread 2 ===

// We want to add a new item to the list.
// To make sure that nobody is accessing the
// list while we do, we need to obtain the mutex.
pthread_mutex_lock(&listLock);

// Now nobody but us is allowed to access the list.
// Check if the list is empty.
bool listWasEmpty = (list->count == 0);

// We add our item.
addListItem(list, newItem);

// If the list was empty, one or even multiple
// threads may be waiting for us adding an item.
// So we should wake them up here.
if (listWasEmpty) {
    // If any thread is waiting for that condition,
    // wake it up as now there is an item to process.
    pthread_cond_signal(&listCondition);
}

// Finally we must release the mutex again.
pthread_mutex_unlock(&listLock);

The code is written so that there can be any number of reader/writer threads. Signaling only if the list was empty (listWasEmpty) is just a performance optimization, the code would also work correctly if you always signal the condition after adding an item.

Upvotes: 2

Nominal Animal
Nominal Animal

Reputation: 39396

First, a summary:

  • pthread_mutex_lock(&mutex):

    If mutex is free, then this thread grabs it immediately.

    If mutex is grabbed, then this thread waits until the mutex becomes free, and then grabs it.
     

  • pthread_mutex_trylock(&mutex):

    If mutex is free, then this thread grabs it.

    If mutex is grabbed, then the call returns immediately with EBUSY.
     

  • pthread_mutex_unlock(&mutex):

    Releases mutex.
     

  • pthread_cond_signal(&cond):

    Wake up one thread waiting on the condition variable cond.
     

  • pthread_cond_broadcast(&cond):

    Wake up all threads waiting on the condition variable cond.
     

  • pthread_cond_wait(&cond, &mutex):

    This must be called with mutex grabbed.

    The calling thread will temporarily release mutex and wait on cond.

    When cond is broadcast on, or signaled on and this thread happens to be the one woken up, then the calling thread will first re-grab the mutex, and then return from the call.

    It is important to note that at all times, the calling thread either has mutex grabbed, or is waiting on cond. There is no interval in between.


Let's look at a practical, running example code. We'll create it along the lines of OP's code.

First, we'll use a structure to hold the parameters for each worker function. Since we'll want the mutex and the condition variable to be shared between threads, we'll use pointers.

#define  _POSIX_C_SOURCE  200809L
#include <stdlib.h>
#include <pthread.h>
#include <limits.h>
#include <string.h>
#include <stdio.h>
#include <errno.h>

/* Worker function work. */
struct work {
    pthread_t        thread_id;
    pthread_mutex_t *lock;      /* Pointer to the mutex to use */
    pthread_cond_t  *wait;      /* Pointer to the condition variable to use */
    volatile int    *done;      /* Pointer to the flag to check */
    FILE            *out;       /* Stream to output to */
    long             id;        /* Identity of this thread */
    unsigned long    count;     /* Number of times this thread iterated. */
};

The thread worker function receives a pointer to the above structure. Each thread iterates the loop once, then waits on the condition variable. When woken up, if the done flag is still zero, the thread iterates the loop. Otherwise, the thread exits.

/* Example worker function. */
void *worker(void *workptr)
{
    struct work *const work = workptr;

    pthread_mutex_lock(work->lock);

    /* Loop as long as *done == 0: */
    while (!*(work->done)) {
        /* *(work->lock) is ours at this point. */

        /* This is a new iteration. */
        work->count++;

        /* Do the work. */
        fprintf(work->out, "Thread %ld iteration %lu\n", work->id, work->count);
        fflush(work->out);

        /* Wait for wakeup. */
        pthread_cond_wait(work->wait, work->lock);
    }

    /* *(work->lock) is still ours, but we've been told that all work is done already. */
    /* Release the mutex and be done. */
    pthread_mutex_unlock(work->lock);
    return NULL;
}

To run the above, we'll need a main() as well:

#ifndef  THREADS
#define  THREADS  4
#endif

int main(void)
{
    pthread_mutex_t  lock = PTHREAD_MUTEX_INITIALIZER;
    pthread_cond_t   wait = PTHREAD_COND_INITIALIZER;
    volatile int     done = 0;
    struct work      w[THREADS];

    char            *line = NULL, *p;
    size_t           size = 0;
    ssize_t          len  = 0;

    unsigned long    total;
    pthread_attr_t   attrs;
    int              i, err;

    /* The worker functions require very little stack, but the default stack
       size is huge. Limit that, to reduce the (virtual) memory use. */
    pthread_attr_init(&attrs);
    pthread_attr_setstacksize(&attrs, 2 * PTHREAD_STACK_MIN);

    /* Grab the mutex so the threads will have to wait to grab it. */
    pthread_mutex_lock(&lock);

    /* Create THREADS worker threads. */
    for (i = 0; i < THREADS; i++) {

        /* All threads use the same mutex, condition variable, and done flag. */
        w[i].lock = &lock;
        w[i].wait = &wait;
        w[i].done = &done;

        /* All threads output to standard output. */
        w[i].out = stdout;

        /* The rest of the fields are thread-specific. */
        w[i].id = i + 1;
        w[i].count = 0;

        err = pthread_create(&(w[i].thread_id), &attrs, worker, (void *)&(w[i]));
        if (err) {
            fprintf(stderr, "Cannot create thread %d of %d: %s.\n", i+1, THREADS, strerror(errno));
            exit(EXIT_FAILURE);  /* Exits the entire process, killing any other threads as well. */
        }
    }

    fprintf(stderr, "The first character on each line controls the type of event:\n");
    fprintf(stderr, "    e, q    exit\n");
    fprintf(stderr, "    s       signal\n");
    fprintf(stderr, "    b       broadcast\n");
    fflush(stderr);

    /* Let each thread grab the mutex now. */
    pthread_mutex_unlock(&lock);

    while (1) {
        len = getline(&line, &size, stdin);
        if (len < 1)
            break;

        /* Find the first character on the line, ignoring leading whitespace. */
        p = line;
        while ((p < line + len) && (*p == '\0' || *p == '\t' || *p == '\n' ||
                                    *p == '\v' || *p == '\f' || *p == '\r' || *p == ' '))
            p++;

        /* Do the operation mentioned */
        if (*p == 'e' || *p == 'E' || *p == 'q' || *p == 'Q')
            break;
        else
        if (*p == 's' || *p == 'S')
            pthread_cond_signal(&wait);
        else
        if (*p == 'b' || *p == 'B')
            pthread_cond_broadcast(&wait);
    }

    /* It is time for the worker threads to be done. */
    pthread_mutex_lock(&lock);
    done = 1;
    pthread_mutex_unlock(&lock);

    /* To ensure all threads see the state of that flag,
       we wake up all threads by broadcasting on the condition variable. */
    pthread_cond_broadcast(&wait);

    /* Reap all threds. */
    for (i = 0; i < THREADS; i++)
        pthread_join(w[i].thread_id, NULL);

    /* Output the thread statistics. */
    total = 0;
    for (i = 0; i < THREADS; i++) {
        total += w[i].count;
        fprintf(stderr, "Thread %ld: %lu events.\n", w[i].id, w[i].count);
    }
    fprintf(stderr, "Total: %lu events.\n", total);

    return EXIT_SUCCESS;
}

If you save the above as example.c, you can compile it to example using e.g. gcc -Wall -O2 example.c -lpthread -o example.

To get the correct intuitive grasp of the operations, run the example in a terminal, with the source code in a window next to it, and see how the execution progresses as you provide input.

You can even run commands like printf '%s\n' s s s b q | ./example to run a sequence of events in a quick succession, or printf 's\ns\ns\nb\nq\n' | ./example with even less time in between events.

After some experimentation, you'll hopefully find out that not all input events cause their respective action. This is because the exit event (q above) is not synchronous: it does not wait for all pending work to be done, but tells the threads to exit right then and there. That is why the number of events may vary even for the exact same input.

(Also, if you signal on the condition variable, and immediately broadcast on it, the threads tend to only get woken up once.)

You can mitigate that by delaying the exit, using e.g. (printf '%s\n' s s b s s s ; sleep 1 ; printf 'q\n' ) | ./example.

However, there are better ways. A condition variable is not suitable for countable events; it is really flag-like. A semaphore would work better, but then you should be careful to not overflow the semaphore; it can only be from 0 to SEM_VALUE_MAX, inclusive. (So, you could use a semaphore to represent the number of pending job, but probably not for the number of iterations done by each/all thread workers.) A queue for the work to do, in thread pool fashion, is the most common approach.

Upvotes: 4

Related Questions