Alex Reynolds
Alex Reynolds

Reputation: 96937

How to add a second consumer to a pthread-based producer-consumer setup?

I currently have two threads for a producer-consumer setup, which uses pthread_cond_wait() and pthread_cond_signal() to alternate between reading in data and processing it.

Say I have a lock, two conditions, and a Boolean flag that states if the data buffer has data in it:

pthread_mutex_t lock;
pthread_cond_t we_have_data;
pthread_cond_t we_need_data;
bool buffer_is_empty = true;

I have a pthread_t that uses the following function to produce data (read data into a buffer):

static void* produce(void* arg) {
    pthread_mutex_lock(&lock);
    for (;;) {
        while (!buffer_is_empty) {
            pthread_cond_wait(&we_need_data, &lock);
        }
        pthread_mutex_unlock(&lock);
        // read some data into our buffer
        pthread_mutex_lock(&lock);
        buffer_is_empty = false;
        pthread_cond_signal(&we_have_data);
    }
}

Then I have a second pthread_t that uses the following code to consume that data, upon receiving the we_have_data signal:

static void* consume(void* arg) {
    pthread_mutex_lock(&lock);
    for (;;) {
        while (buffer_is_empty) {
            pthread_cond_wait(&we_have_data, &lock);
        }
        pthread_mutex_unlock(&lock);
        // process the data in our buffer
        pthread_mutex_lock(&lock);
        buffer_is_empty = true;
        pthread_cond_signal(&we_need_data);
    }
}

This works correctly.

What I would now like to do is add a third thread that does work on the data from the consume() function, if the buffer contains certain data.

I had tried adding a third condition, but my program hangs.

I set up a condition and Boolean flag:

bool processing_with_second_consumer;
pthread_cond_t we_need_to_process_data_with_another_consumer;

Then I modify the consumer:

static void* consume(void* arg) {
    pthread_mutex_lock(&lock);
    for (;;) {
        while (buffer_is_empty && !processing_with_second_consumer) {
            pthread_cond_wait(&we_have_data, &lock);
        }
        pthread_mutex_unlock(&lock);
        // process the data in our buffer
        pthread_mutex_lock(&lock);
        if (data_meets_our_conditions) {
            processing_with_second_consumer = true;
            pthread_cond_signal(&we_need_to_process_data_with_another_consumer);
        }
        buffer_is_empty = true;
        pthread_cond_signal(&we_need_data);
    }
}

And then I modify the producer to wait on the boolean:

static void* produce(void* arg) {
    pthread_mutex_lock(&lock);
    for (;;) {
        while (!buffer_is_empty && !processing_with_second_consumer) {
            pthread_cond_wait(&we_need_data, &lock);
        }
        pthread_mutex_unlock(&lock);
        // read some data into our buffer
        pthread_mutex_lock(&lock);
        buffer_is_empty = false;
        pthread_cond_signal(&we_have_data);
    }
}

And add a third thread to consume from the consumer:

static void* consume_from_the_consumer(void* arg) {
    pthread_mutex_lock(&lock);
    for (;;) {
        while (!buffer_is_empty && processing_with_second_consumer) {
            pthread_cond_wait(&we_need_to_process_data_with_another_consumer, &lock);
        }
        pthread_mutex_unlock(&lock);
        // do more specific processing of the data in our buffer
        pthread_mutex_lock(&lock);
        processing_with_second_consumer = false;
    }
}

I can't seem to get the program to exit correctly — it basically hangs in an infinite loop on consuming from the consumer.

How does one correctly set up signaling with pthread conditions, in order to allow a third (or fourth, or fifth, etc.) thread?

Upvotes: 0

Views: 267

Answers (2)

Alex Reynolds
Alex Reynolds

Reputation: 96937

To solve this for three threads, I needed to make a few changes:

  1. Move the mutex lock into the thread loop; the first thing a loop should do is lock the data, and the last thing it should do is unlock it.
  2. Set up three bool flags: is_new_line_available, is_new_subdata_available, and is_eof.
  3. Set up three pthread_cond_t conditions: new_line_is_available, new_line_is_empty, and new_subdata_is_available.
  4. Make sure every thread has a condition in which pthread_exit() is called to terminate that thread.

The produce thread:

static void* produce(void* arg) {
    for (;;) {
        pthread_mutex_lock(&lock);
        while (is_new_line_available) {
            pthread_cond_wait(&new_line_is_empty, &lock);
        }
        // ... read a line of data into buffer ...
        if (EOF) {
            is_new_line_available = true;
            is_new_subdata_available = true;
            is_eof = false;
            pthread_cond_signal(&new_line_is_available);
            pthread_cond_signal(&new_subdata_is_available);
            pthread_mutex_unlock(&lock);
            pthread_exit(NULL);
        }
        is_new_line_available = true;
        is_new_chromosome_available = false;
        is_eof = false;
        pthread_cond_signal(&new_line_is_available);
        pthread_mutex_unlock(&lock);
    }
}

The consume thread:

static void* consume(void* arg) {
    for (;;) {
        pthread_mutex_lock(&lock);
        while (is_new_line_available) {
            pthread_cond_wait(&new_line_is_available, &lock);
        }
        // ... process line of data to look for subdata type ... 
        if (EOF) {
            is_eof = true;
            pthread_cond_signal(&new_subdata_is_available);
            pthread_mutex_unlock(&lock);
            pthread_exit(NULL);
        }
        else if (subdata_found) {
            is_new_subdata_available = true;
            is_new_line_available = false;
            pthread_cond_signal(&new_line_is_empty);
        }
        pthread_mutex_unlock(&lock);
    }
}

Then the third "subdata"-handling thread:

static void* consume_subdata_from_the_consumer(void* arg) {
    for (;;) {
        if (is_eof) {
            pthread_exit(NULL);
        }
        pthread_mutex_lock(&lock);
        while (!is_new_subdata_available) {
            pthread_cond_wait(&new_subdata_is_available, &lock);
        }
        // ... process subdata ...
        is_new_subdata_available = false;
        is_new_line_available = true;
        pthread_cond_signal(new_line_is_available);
        pthread_mutex_unlock(&lock);
    }
}

Some observations:

  • All threads should have a condition that gets them to a pthread_exit(), or the parent process will hang.
  • It is necessary to pull all code that modifies state in between lock and unlock directives, or data that gets processed out-of-order can leave to corruption.
  • Any buffer overflows or writing to initialized data can cause problems. Use calloc(), for instance, to initialize a character buffer before using it in a thread.

Upvotes: 0

David Schwartz
David Schwartz

Reputation: 182759

You producer only signals we_have_data. But since it sets buffer_is_empty to false, it can make the consume_from_the_consumer thread ready to go, but it doesn't unblock it because it's blocked on a second condition variable.

To make your life simpler, I'd suggest two changes:

  1. Always use pthread_cond_broadcast.
  2. Only use one condition variable.

This may be very slightly less efficient, but there are several entire categories of subtle bugs that it makes impossible.

Upvotes: 1

Related Questions