Yos
Yos

Reputation: 1376

How to control pthreads with multiple mutexes and conditions?

In the code below I wrote a program to perform add/remove operations on an int array using multithreading. The condition is that multiple threads cannot make operations on the same cell, but parallel operations can be made on different cells.

I thought in order to implement such conditions I'd need to use multiple mutexes and condition variables, to be exact, as many as there're cells in the array. The initial value of all cells of my array is 10 and threads increment/decrement this value by 3.

The code below seems to work (the cell values of the array after all threads finished working is as expected) but I don't understand a few things:

  1. I first spawn adder threads which sleep for a second. In addition each thread has printf statement which is triggered if a thread waits. Remove threads don't sleep so I expect remove threads to invoke their printf statements because they must wait a second at least before adder threads finish their work. But remover threads never call printf.
  2. My second concern: as I mentioned I first spawn adder threads so I expect the cells value go from 10 to 13. Then if remover thread acquires lock the value can go from 13 to 10 OR if adder thread acquires the lock then the cell value will go from 13 to 16. But I don't see the behavior in printf statements inside threads. For example one of the printf sequences I had: add thread id and cell id 1: cell value 10->13, then remove thread id and cell id 1: cell value 10->7 then add thread id and cell id 1: cell value 10->13. This doesn't make sense. I made sure that the threads all point to the same array.

Bottom line I'd like to know whether my solution is correct and if yes why is the behavior I described occurring. If my solution is incorrect I'd appreciate example of correct solution or at least general direction.

This is the code (all the logic is in AdderThread, RemoveThread):

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>

#define ARR_LEN 5
#define THREADS_NUM 5
#define INIT_VAL 10
#define ADD_VAL 3
#define REMOVE_VAL 3
#define ADDER_LOOPS 2

typedef struct helper_t {
    int threadId;
    int * arr;
    int * stateArr; //0 if free, 1 if busy
} helper_t;

enum STATE {FREE, BUSY};
enum ERRORS {MUTEX, COND, CREATE, JOIN, LOCK, UNLOCK, WAIT, BROADCAST};

pthread_mutex_t mutexArr[THREADS_NUM];
pthread_cond_t condArr[THREADS_NUM];

void errorHandler(int errorId) {
    switch (errorId) {
        case MUTEX:
            printf("mutex error\n");
            break;
        case COND:
            printf("cond error\n");
            break;
        case CREATE:
            printf("create error\n");
            break;
        case JOIN:
            printf("join error\n");
            break;
        case LOCK:
            printf("lock error\n");
            break;
        case UNLOCK:
            printf("unlock error\n");
            break;
        case WAIT:
            printf("wait error\n");
            break;
        case BROADCAST:
            printf("broadcast error\n");
            break;
        default:
            printf("default switch\n");
            break;
    }
}

void mallocError() {
    printf("malloc error\nExiting app\n");
    exit(EXIT_FAILURE);
}

void initMutexesAndConds(pthread_mutex_t * mutexArr, pthread_cond_t * condArr) {
    int i;

    for(i = 0; i < THREADS_NUM; i++) {
        pthread_mutex_init(&mutexArr[i], NULL);
        pthread_cond_init(&condArr[i], NULL);
    }
}

helper_t * initStructs(int * arr, int * stateArr) {
    int i;
    helper_t * helpers = (helper_t *) malloc(sizeof(helper_t) * THREADS_NUM);
    if(!helpers) {
        mallocError();
    } else {
        for(i = 0; i < THREADS_NUM; i++) {
            helpers[i].threadId = i;
            helpers[i].arr = arr;
            helpers[i].stateArr = stateArr;
        }
    }
    return helpers;
}

void printArr(int * arr, int len) {
    int i;
    for(i = 0; i < len; i++) {
        printf("%d, ", arr[i]);
    }
    printf("\n");
}

void * AdderThread(void * arg) {
    int i;
    helper_t * h = (helper_t *) arg;
    int id = h->threadId;
    for(i = 0; i < ADDER_LOOPS; i++) {
        pthread_mutex_t * mutex = &mutexArr[id];
        pthread_cond_t * cond = &condArr[id];
        if(pthread_mutex_lock(mutex)) {
            errorHandler(LOCK);
        }
        while(h->stateArr[id] == BUSY) {
            printf("adder id %d waiting...\n", id);
            if(pthread_cond_wait(cond, mutex)) {
                errorHandler(WAIT);
            }
        }
        h->stateArr[id] = BUSY;
        sleep(1);
        h->arr[id] = h->arr[id] + ADD_VAL;
        printf("add thread id and cell id %d: cell value %d->%d\n", id, h->arr[id]-ADD_VAL, h->arr[id]);
        h->stateArr[id] = FREE;
        if(pthread_cond_broadcast(cond)) {
            errorHandler(BROADCAST);
        }
        if(pthread_mutex_unlock(mutex)) {
            errorHandler(UNLOCK);
        }
    }
    pthread_exit(NULL);
}

void * RemoveThread(void * arg) {
    helper_t * h = (helper_t *) arg;
    int id = h->threadId;
    pthread_mutex_t * mutex = &mutexArr[id];
    pthread_cond_t * cond = &condArr[id];
    if(pthread_mutex_lock(mutex)) {
        errorHandler(LOCK);
    }
    while(h->stateArr[id] == BUSY) {
        printf("remover id %d waiting...\n", id);
        if(pthread_cond_wait(cond, mutex)) {
            errorHandler(WAIT);
        }
    }
    h->stateArr[id] = BUSY;
    h->arr[id] = h->arr[id] - REMOVE_VAL;
    printf("remove thread id and cell id %d: cell value %d->%d\n", id, h->arr[id], h->arr[id]-ADD_VAL);
    h->stateArr[id] = FREE;
    if(pthread_cond_broadcast(cond)) {
        errorHandler(BROADCAST);
    }
    if(pthread_mutex_unlock(mutex)) {
        errorHandler(UNLOCK);
    }
    pthread_exit(NULL);
}

int main() {
    int i;
    helper_t * adderHelpers;
    helper_t * removeHelpers;
    pthread_t adders[THREADS_NUM];
    pthread_t removers[THREADS_NUM];
    int * arr = (int *) malloc(sizeof(int) * ARR_LEN);
    int * stateArr = (int *) malloc(sizeof(int) * ARR_LEN);
    if(!arr || !stateArr) {
        mallocError();
    }

    for(i = 0; i < ARR_LEN; i++) {
        arr[i] = INIT_VAL;
        stateArr[i] = FREE;
    }

    initMutexesAndConds(mutexArr, condArr);
    adderHelpers = initStructs(arr, stateArr);
    removeHelpers = initStructs(arr, stateArr);

    for(i = 0; i < THREADS_NUM; i++) {
        pthread_create(&adders[i], NULL, AdderThread, &adderHelpers[i]);
        pthread_create(&removers[i], NULL, RemoveThread, &removeHelpers[i]);
    }

    for(i = 0; i < THREADS_NUM; i++) {
        pthread_join(adders[i], NULL);
        pthread_join(removers[i], NULL);
    }

    printf("the results are:\n");
    printArr(arr, THREADS_NUM);
    printf("DONE.\n");

    return 0;
}

Upvotes: 1

Views: 2601

Answers (1)

mevets
mevets

Reputation: 10435

1) This code sequence in Addr:

   h->stateArr[id] = BUSY;
        sleep(1);
        h->arr[id] = h->arr[id] + ADD_VAL;
        printf("add thread id and cell id %d: cell value %d->%d\n", id, h->arr[id]-ADD_VAL, h->arr[id]);
        h->stateArr[id] = FREE;

Is execute with the mutex locked; thus Remove would never get a chance to see the state as anything but FREE.

2) There is no guarantee that mutex ownership alternates (afaik), but at the very least, to properly co-ordinate threads you should never rely upon such an implementation detail. It is the difference between working and “happens to work”, which usually leads to “used to work”....

If you put the sleep() between the mutex unlock and mutex lock, you might have a better case, but as it is, it just unlocks it then locks it again, so the system is well within its rights to just let it continue executing.

[ I ran out of space in comments ... ]:

Yes, the condition variables are doing nothing for you here. The idea of a condition variable is to be able to be notified when a significant event, such as a state change, has occurred on some shared objection.

For example, a reservoir might have a single condition variable for the water level. Multiplexed onto that might be many conditions: level < 1m; level > 5m; level > 10m. To keep the systems independent (thus working), the bit that updates the level might just:

pthread_mutex_lock(&levellock);
level = x;
pthread_cond_broadcast(&newlevel);
pthread_mutex_unlock(&levellock);

The actors implementing the conditions would do something like:

pthread_mutex_lock(&levellock);
while (1) {
    if (level is my conditions) {
         pthread_mutex_unlock(&levellock);
         alert the media
         pthread_mutex_lock(&levellock);
    }
    pthread_cond_wait(&newlevel, &levellock);
}

Thus I can add many “condition monitors” without breaking the level setting code, or the overall system. Many is finite, but by releasing the mutex while I alert the media, I avoid having my water monitoring system rely on the alarm handling.

If you are familiar with “publish/subscribe”, you might find this familiar. This is fundamentally the same model, just the PS hides a pile of details.

Upvotes: 1

Related Questions