xXxJackFrostxXx
xXxJackFrostxXx

Reputation: 21

Unexpected pthread output

Ok, so I am trying to write a basic program which will create "reader" and "writer" threads, that are sharing a common resource, with readers gaining priority over writers whenever possible. I am protecting this resource using a mutex structure, but the behavior of the code is not what I expected. I have traced the problem to one single line of my code, but I just cannot understand why it won't work! The "bug" doesn't always occur, which is to be expected with threads, but when it does happen I can't find a proper explanation for it! Here is my program:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>

#define N 3 //Number of readers.
#define M 3  //Number of writers.
#define X 2  //Number of times a reader reads.
#define Y 2  //Number of times a writer writes.

pthread_mutex_t res_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t res_cond_read = PTHREAD_COND_INITIALIZER;
pthread_cond_t res_cond_write = PTHREAD_COND_INITIALIZER;
int resource = 0, state = 0 /*1 = writing, 0 = waiting, abs(<0) = #readers*/, reader_queue_length = 0;

void *reader_job (void* parg);
void *writer_job (void* parg);

int main ()
{       
    srandom((unsigned int)time(NULL));

    pthread_t readers[N];
    pthread_t writers[M];   
    int valueR[N], valueW[M];

    //printf ("NOTICE: Reader queue lengths excluding current thread.\n");
    for (int i=0; i<N; i++) {       
        valueR[i] = i;
    }

    for (int i=0; i<M; i++) {       
        valueW[i] = i;
    }   
    for (int i=0; i<N; i++) {       
        pthread_create (&readers[i], NULL, reader_job, &valueR[i]);                     
    }

    for (int i=0; i<M; i++) {
        pthread_create (&writers[i], NULL, writer_job, &valueW[i]);
    }

    for (int i=0; i<N; i++) {
        pthread_join (readers[i], NULL);        
    }

    for (int i=0; i<M; i++){
        pthread_join (writers[i], NULL);
    }
    return 0;
}

void *reader_job (void* parg) {
    int *temp = (int*) parg;
    int value = *temp;  
    int local_read_num = 0;
    printf (">>> Creating reader: %d\n", value);
    while (local_read_num < X) {
        usleep(1000 * (random() % (N+M)));              
        reader_queue_length++; //When this line is here, wrong.
        printf ("INFO::About to access entry mutex for reader with ID: %2d, with queue length %2d.\n",
            value, reader_queue_length);
        pthread_mutex_lock(&res_mutex);
            //reader_queue_length++; //When it is here, right.
            if (reader_queue_length == 0) printf ("\nERROR::Before entry mutex!\n\n");          
            while (state != 0)
                pthread_cond_wait (&res_cond_read, &res_mutex);
            state--;
            if (reader_queue_length == 0) printf ("\nERROR::During entry mutex!\n\n");
            reader_queue_length--;
            printf ("INFO::About to exit entry mutex for reader with ID: %2d, with queue length %2d.\n",
                value, reader_queue_length);
        pthread_mutex_unlock (&res_mutex);

        printf ("[READER (ID: %2d):] Readers in queue: %2d, read: %2d\n", value, reader_queue_length, resource);

        pthread_mutex_lock (&res_mutex);
            state++;
            pthread_cond_signal (&res_cond_read);
            if (reader_queue_length == 0) pthread_cond_signal (&res_cond_write);                        
        pthread_mutex_unlock (&res_mutex);


        local_read_num++;
    }
    printf ("!!! Destroying reader: %d\n", value);
    pthread_exit (NULL);
}

void *writer_job (void* parg) {
    int *temp = (int*) parg;
    int value = *temp;  
    int local_write_num = 0;
    printf (">>> Creating writer: %d\n", value);
    while (local_write_num < Y) {
        //usleep(1000 * (random() % (N+M)));
        pthread_mutex_lock(&res_mutex);
            while (state != 0 || reader_queue_length > 0)
                pthread_cond_wait (&res_cond_write, &res_mutex);    
            state++;
        pthread_mutex_unlock(&res_mutex);

        resource = value*5+local_write_num;
        printf ("[WRITER (ID: %2d):] Readers in queue: %2d, wrote: %2d\n", value, reader_queue_length, resource);

        pthread_mutex_lock (&res_mutex);
            state--;
        pthread_mutex_unlock(&res_mutex);

        pthread_cond_signal(&res_cond_read);
        pthread_cond_signal(&res_cond_write);
        local_write_num++;
    }
    printf ("!!! Destroying writer: %d\n", value);
    pthread_exit (NULL);
}

Looking at the line (in reader_job) reader_queue_length++;, I don't get why the program doesn't work when the line is outside the mutex, but does work when it is inside! And the output confuses me even more, because the "bug" occurs at the end (in this particular run instance, see below), when all threads have already been joined, and therefore noone is left to alter my variable, right?!

>>> Creating reader: 0
>>> Creating reader: 1
>>> Creating reader: 2
>>> Creating writer: 0
INFO::About to access entry mutex for reader with ID:  1, with queue length  1.
INFO::About to exit entry mutex for reader with ID:  1, with queue length  0.
[READER (ID:  1):] Readers in queue:  0, read:  0
>>> Creating writer: 2
>>> Creating writer: 1
[WRITER (ID:  0):] Readers in queue:  0, wrote:  0
[WRITER (ID:  0):] Readers in queue:  0, wrote:  1
!!! Destroying writer: 0
[WRITER (ID:  1):] Readers in queue:  0, wrote:  5
[WRITER (ID:  1):] Readers in queue:  0, wrote:  6
!!! Destroying writer: 1
[WRITER (ID:  2):] Readers in queue:  0, wrote: 10
[WRITER (ID:  2):] Readers in queue:  0, wrote: 11
!!! Destroying writer: 2
INFO::About to access entry mutex for reader with ID:  1, with queue length  1.
INFO::About to exit entry mutex for reader with ID:  1, with queue length  0.
[READER (ID:  1):] Readers in queue:  0, read: 11
!!! Destroying reader: 1
INFO::About to access entry mutex for reader with ID:  2, with queue length  1.
INFO::About to exit entry mutex for reader with ID:  2, with queue length  0.
[READER (ID:  2):] Readers in queue:  0, read: 11
INFO::About to access entry mutex for reader with ID:  0, with queue length  1.
INFO::About to exit entry mutex for reader with ID:  0, with queue length  0.
[READER (ID:  0):] Readers in queue:  0, read: 11
INFO::About to access entry mutex for reader with ID:  0, with queue length  1.
INFO::About to exit entry mutex for reader with ID:  0, with queue length  0.
[READER (ID:  0):] Readers in queue:  0, read: 11
!!! Destroying reader: 0
INFO::About to access entry mutex for reader with ID:  2, with queue length  1.

ERROR::Before entry mutex!


ERROR::During entry mutex!

INFO::About to exit entry mutex for reader with ID:  2, with queue length -1.
[READER (ID:  2):] Readers in queue: -1, read: 11
!!! Destroying reader: 2

HOW did the reader queue end up having negative length?? Which thread was left to alter the contents? They were all destroyed already, or at least past the point in their run cycle where they could mess with the variable!

FYI I am running this on Ubuntu 16.04 with the gcc compiler and -pthread flag.

Thank you all in advance!

Upvotes: 1

Views: 41

Answers (1)

xXxJackFrostxXx
xXxJackFrostxXx

Reputation: 21

Alright, so I think this is the problem: when I try to edit the queue length variable outside the mutex, what happens is that multiple threads try accessing the same variable at once, causing a data race!

For example, if the value of the variable is A, and the command trying to access it tries to edit the value to A+1, it could be the case that the threads trying to access the variable all load the initial value A, without properly waiting for each other to edit the variable and then getting the updated value. So they all end up with the value A+1, before entering the mutex, and that causes the "bug".

Bottom line: That command needs to be inside of the mutex!

Upvotes: 1

Related Questions