Alex Reynolds
Alex Reynolds

Reputation: 96927

How to get two pthread threads to respond to each others' wait and signal conditions?

I'm having a bit of trouble getting a basic two-thread arrangement working.

I am reading a chunk of bytes into memory from stdin in one "producer" thread, and processing those bytes in a second "consumer" thread, once those bytes are available. Once the bytes are consumed, the consumer thread goes back to being dormant and the producer thread gets running again.

I am using pthread_cond_wait() and pthread_cond_signal() to have the two threads communicate to each other that data are produced or consumed.

Here is the code for the two threads:

void * produce_bytes(void *t_data)
{ 
    pthread_data_t *d = (pthread_data_t *)t_data;

    do {
        pthread_mutex_lock(&d->input_lock);
        d->n_bytes = fread(d->in_buf, sizeof(unsigned char), BUF_LENGTH_VALUE, stdin);
        if (d->n_bytes > 0) { 
            fprintf(stdout, "PRODUCER ...signaling consumer...\n");
            pthread_cond_signal(&d->input_cond);
            fprintf(stdout, "PRODUCER ...consumer signaled...\n");
        }
        pthread_mutex_unlock(&d->input_lock);
    } while (d->n_bytes > 0);

    return NULL;
}

void * consume_bytes(void *t_data) 
{
    pthread_data_t *d = (pthread_data_t *)t_data;

    pthread_mutex_lock(&d->input_lock);
    while (d->n_bytes == 0)
        pthread_cond_wait(&d->input_cond, &d->input_lock);
    fprintf(stdout, "CONSUMER ...consuming chunk...\n");
    d->n_bytes = 0;
    fprintf(stdout, "CONSUMER ...chunk consumed...\n");
    pthread_mutex_unlock(&d->input_lock);
}

The pthread_data_t is a struct I use to keep track of state:

typedef struct {
    pthread_mutex_t input_lock;
    pthread_cond_t input_cond;
    unsigned char in_buf[BUF_LENGTH_VALUE];
    size_t n_bytes;
} pthread_data_t;

I configure variables in my main() function; here is the relevant excerpt:

pthread_t producer_thread = NULL;
pthread_t consumer_thread = NULL;
pthread_data_t *thread_data = NULL;

thread_data = malloc(sizeof(pthread_data_t));
thread_data->n_bytes = 0;
pthread_mutex_init(&(thread_data->input_lock), NULL);
pthread_cond_init(&(thread_data->input_cond), NULL);

pthread_create(&producer_thread, NULL, produce_bytes, (void *) thread_data);
pthread_create(&consumer_thread, NULL, consume_bytes, (void *) thread_data);

pthread_join(producer_thread, NULL);
pthread_join(consumer_thread, NULL);

When I run this, produce_bytes() signals consume_bytes() successfully on the first iteration, but on the second and subsequent iterations, a signal is sent to consume_bytes() and it never gets heard, so the consumer function never gets run again:

PRODUCER ...signaling consumer...
PRODUCER ...consumer signaled...
CONSUMER ...consuming chunk...
CONSUMER ...chunk consumed...
PRODUCER ...signaling consumer...
PRODUCER ...consumer signaled...
PRODUCER ...signaling consumer...
PRODUCER ...consumer signaled...
PRODUCER ...signaling consumer...
PRODUCER ...consumer signaled...
...

I am using the tutorial here as the basis for what I'm trying to do. What I am doing wrong?

Upvotes: 0

Views: 599

Answers (2)

merlin2011
merlin2011

Reputation: 75545

Here is a working example which addresses Maxim's point 2 and 3, but not 1 because that is necessary for responsiveness but not strictly for correctness.

Note that I have not implemented a means for the producer to signal EOF to the consumer, so the consumer will never exit.

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

#define  BUF_LENGTH_VALUE 100
typedef struct {
    pthread_mutex_t input_lock;
    pthread_cond_t input_cond;
    unsigned char in_buf[BUF_LENGTH_VALUE];
    size_t n_bytes;
} pthread_data_t;

void * produce_bytes(void *t_data)
{ 
    pthread_data_t *d = (pthread_data_t *)t_data;

    size_t local_byte_count = 0;
    do {
        pthread_mutex_lock(&d->input_lock);
        local_byte_count = fread(d->in_buf, sizeof(unsigned char),
                BUF_LENGTH_VALUE, stdin);

        d->n_bytes += local_byte_count;
        if (d->n_bytes > 0) { 
            fprintf(stdout, "PRODUCER ...signaling consumer...\n");
            pthread_cond_signal(&d->input_cond);
            fprintf(stdout, "PRODUCER ...consumer signaled...\n");
        }
        pthread_mutex_unlock(&d->input_lock);

        // This is added to slow down the producer so that we can observe
        // multiple consumptions.
        sleep(1);
    } while (local_byte_count >  0);



    return NULL;
}

void * consume_bytes(void *t_data) 
{
    pthread_data_t *d = (pthread_data_t *)t_data;
    while (1) {
        pthread_mutex_lock(&d->input_lock);
        while (d->n_bytes == 0) {
            fprintf(stdout, "CONSUMER entering wait \n");
            pthread_cond_wait(&d->input_cond, &d->input_lock);
        }
        fprintf(stdout, "CONSUMER ...consuming chunk...\n");
        d->n_bytes = 0;
        fprintf(stdout, "CONSUMER ...chunk consumed...\n");
        pthread_mutex_unlock(&d->input_lock);
        fflush(stdout);
    }

}

int main(){
    pthread_t producer_thread = NULL;
    pthread_t consumer_thread = NULL;
    pthread_data_t *thread_data = NULL;

    thread_data = malloc(sizeof(pthread_data_t));
    thread_data->n_bytes = 0;
    pthread_mutex_init(&(thread_data->input_lock), NULL);
    pthread_cond_init(&(thread_data->input_cond), NULL);

    pthread_create(&producer_thread, NULL, produce_bytes, (void *) thread_data);
    pthread_create(&consumer_thread, NULL, consume_bytes, (void *) thread_data);

    pthread_join(producer_thread, NULL);
    pthread_join(consumer_thread, NULL);
}

Upvotes: 0

Maxim Egorushkin
Maxim Egorushkin

Reputation: 136208

There are a few issues with that code:

  1. produce_bytes locks the mutex for the duration of the blocking call to fread. A general rule of thumb for responsive applications is to lock the mutex for as short periods as possible. You may like to read the input into a temporary buffer first, then lock the mutex and copy the data to the buffer shared between threads. Same applies to consume_bytes which holds the mutex while calling fprintf which can block.
  2. produce_bytes in while(d->n_bytes > 0) does not hold the mutex, which is a race condition because consume_bytes assigns a new value to d->n_bytes. Assuming you would like to exit that loop when fread returns 0 (EOF), you need to copy the return value of fread into a local variable not shared between threads and use that as the condition in while(read_bytes > 0)
  3. consume_bytes does not have any loop around it so that it returns after the first condition variable notification. You probably would like to wrap it into a while loop and exit only when EOF (0 bytes) have been read.

Upvotes: 1

Related Questions