Reputation: 96937
I currently have two threads for a producer-consumer setup, which uses pthread_cond_wait()
and pthread_cond_signal()
to alternate between reading in data and processing it.
Say I have a lock, two conditions, and a Boolean flag that states if the data buffer has data in it:
pthread_mutex_t lock;
pthread_cond_t we_have_data;
pthread_cond_t we_need_data;
bool buffer_is_empty = true;
I have a pthread_t
that uses the following function to produce data (read data into a buffer):
static void* produce(void* arg) {
pthread_mutex_lock(&lock);
for (;;) {
while (!buffer_is_empty) {
pthread_cond_wait(&we_need_data, &lock);
}
pthread_mutex_unlock(&lock);
// read some data into our buffer
pthread_mutex_lock(&lock);
buffer_is_empty = false;
pthread_cond_signal(&we_have_data);
}
}
Then I have a second pthread_t
that uses the following code to consume that data, upon receiving the we_have_data
signal:
static void* consume(void* arg) {
pthread_mutex_lock(&lock);
for (;;) {
while (buffer_is_empty) {
pthread_cond_wait(&we_have_data, &lock);
}
pthread_mutex_unlock(&lock);
// process the data in our buffer
pthread_mutex_lock(&lock);
buffer_is_empty = true;
pthread_cond_signal(&we_need_data);
}
}
This works correctly.
What I would now like to do is add a third thread that does work on the data from the consume()
function, if the buffer contains certain data.
I had tried adding a third condition, but my program hangs.
I set up a condition and Boolean flag:
bool processing_with_second_consumer;
pthread_cond_t we_need_to_process_data_with_another_consumer;
Then I modify the consumer:
static void* consume(void* arg) {
pthread_mutex_lock(&lock);
for (;;) {
while (buffer_is_empty && !processing_with_second_consumer) {
pthread_cond_wait(&we_have_data, &lock);
}
pthread_mutex_unlock(&lock);
// process the data in our buffer
pthread_mutex_lock(&lock);
if (data_meets_our_conditions) {
processing_with_second_consumer = true;
pthread_cond_signal(&we_need_to_process_data_with_another_consumer);
}
buffer_is_empty = true;
pthread_cond_signal(&we_need_data);
}
}
And then I modify the producer to wait on the boolean:
static void* produce(void* arg) {
pthread_mutex_lock(&lock);
for (;;) {
while (!buffer_is_empty && !processing_with_second_consumer) {
pthread_cond_wait(&we_need_data, &lock);
}
pthread_mutex_unlock(&lock);
// read some data into our buffer
pthread_mutex_lock(&lock);
buffer_is_empty = false;
pthread_cond_signal(&we_have_data);
}
}
And add a third thread to consume from the consumer:
static void* consume_from_the_consumer(void* arg) {
pthread_mutex_lock(&lock);
for (;;) {
while (!buffer_is_empty && processing_with_second_consumer) {
pthread_cond_wait(&we_need_to_process_data_with_another_consumer, &lock);
}
pthread_mutex_unlock(&lock);
// do more specific processing of the data in our buffer
pthread_mutex_lock(&lock);
processing_with_second_consumer = false;
}
}
I can't seem to get the program to exit correctly — it basically hangs in an infinite loop on consuming from the consumer.
How does one correctly set up signaling with pthread conditions, in order to allow a third (or fourth, or fifth, etc.) thread?
Upvotes: 0
Views: 267
Reputation: 96937
To solve this for three threads, I needed to make a few changes:
bool
flags: is_new_line_available
, is_new_subdata_available
, and is_eof
.pthread_cond_t
conditions: new_line_is_available
, new_line_is_empty
, and new_subdata_is_available
.pthread_exit()
is called to terminate that thread.The produce thread:
static void* produce(void* arg) {
for (;;) {
pthread_mutex_lock(&lock);
while (is_new_line_available) {
pthread_cond_wait(&new_line_is_empty, &lock);
}
// ... read a line of data into buffer ...
if (EOF) {
is_new_line_available = true;
is_new_subdata_available = true;
is_eof = false;
pthread_cond_signal(&new_line_is_available);
pthread_cond_signal(&new_subdata_is_available);
pthread_mutex_unlock(&lock);
pthread_exit(NULL);
}
is_new_line_available = true;
is_new_chromosome_available = false;
is_eof = false;
pthread_cond_signal(&new_line_is_available);
pthread_mutex_unlock(&lock);
}
}
The consume thread:
static void* consume(void* arg) {
for (;;) {
pthread_mutex_lock(&lock);
while (is_new_line_available) {
pthread_cond_wait(&new_line_is_available, &lock);
}
// ... process line of data to look for subdata type ...
if (EOF) {
is_eof = true;
pthread_cond_signal(&new_subdata_is_available);
pthread_mutex_unlock(&lock);
pthread_exit(NULL);
}
else if (subdata_found) {
is_new_subdata_available = true;
is_new_line_available = false;
pthread_cond_signal(&new_line_is_empty);
}
pthread_mutex_unlock(&lock);
}
}
Then the third "subdata"-handling thread:
static void* consume_subdata_from_the_consumer(void* arg) {
for (;;) {
if (is_eof) {
pthread_exit(NULL);
}
pthread_mutex_lock(&lock);
while (!is_new_subdata_available) {
pthread_cond_wait(&new_subdata_is_available, &lock);
}
// ... process subdata ...
is_new_subdata_available = false;
is_new_line_available = true;
pthread_cond_signal(new_line_is_available);
pthread_mutex_unlock(&lock);
}
}
Some observations:
pthread_exit()
, or the parent process will hang. calloc()
, for instance, to initialize a character buffer before using it in a thread.Upvotes: 0
Reputation: 182759
You producer only signals we_have_data
. But since it sets buffer_is_empty
to false, it can make the consume_from_the_consumer
thread ready to go, but it doesn't unblock it because it's blocked on a second condition variable.
To make your life simpler, I'd suggest two changes:
pthread_cond_broadcast
.This may be very slightly less efficient, but there are several entire categories of subtle bugs that it makes impossible.
Upvotes: 1