Artotim
Artotim

Reputation: 85

Implement barrier with pthreads on C

I'm trying to parallelize a merge-sort algorithm. What I'm doing is dividing the input array for each thread, then merging the threads results. The way I'm trying to merge the results is something like this:

thread 0                     |   thread 1        |   thread 2         |   thread 3

sort(A0)                     |   sort(A1)        |   sort(A2)         | sort(A3)
merge(A0,A1)                 |                   |   merge(A2,A3)     | 
merge(A0A1, A2A3)            |                   |                    |

So, at the end of my function sortManager I call the function mergeThreadResults that should implement the above logic. In it I iterate over pairs to merge the corresponding threads. Then, if needed, I merge the last items onto thread 0. It looks like this :

void mergeThreadResults(long myRank, int myLeft, int myRight, int size, int threads) {

    int nextThread;
    int iter = 2;
    while (iter <= threads) {
        int nextThread = (myRank+1*iter) < threads ? (myRank+1*iter) : threads;
        int nextThreadRight = nextThread * ((float)size / (float)threads) - 1;

        printf("Merging threads %ld to %d\n", myRank, nextThread);
        
        if (myRank % iter != 0) {
            break;
        }

        merge(sortingArray, myLeft, myRight, nextThreadRight);
        sleep(3); // <- sleep

        myRight = nextThreadRight;
        iter = iter * 2;
    }

     if (myRank == 0 && nextThread < threads-1) {
        int nextThreadRight = threads * ((float)size / (float)threads) - 1;
        merge(sortingArray, myLeft, myRight, nextThreadRight);
     }

}

It appears to be working as intended. The problem is, I'm using a sleep function to synchronize the threads, which is far from being the best approach. So I'm trying to implement a barrier with pthread.
In it I try to calculate how many iterations will be needed on that cycle and pass it as breakpoint. When all the threads are at the same point I release the merge function and wait again in the new cycle. This is what I've tried:

        pthread_mutex_lock(&mutex);
        counter++;
        int breakpoint = threads % 2 == 0 ? threads/iter : threads/iter+1;
        if(counter >= breakpoint ) {
            counter = 0;
            pthread_cond_broadcast(&cond_var);
        } else {
            while (pthread_cond_wait(&cond_var, &mutex) != 0);
        }
        pthread_mutex_unlock(&mutex);

But it's not working as intended. Some merge triggers before the last cycle has fully ended, leaving me with a partially sorted array.

This is a minor example of my code for testing:

#define _GNU_SOURCE

#include <stdio.h>
#include <stdlib.h>
#include <limits.h>
#include <string.h>
#include <time.h>

#include <pthread.h>
#include <unistd.h>

// Initialize global variables
int sortingArray[20] = {5,-4,3,-1,-2,3,1,2,-2,-1,-2,-1,-2,-3,4,1234,534,123,87,123};
int counter = 0;
pthread_mutex_t mutex;
pthread_cond_t cond_var;

struct ThreadTask {
    long rank;
    int size;
    int threads;
};

void merge(int arr[], int left, int mid, int right) {
    /* Merge arrays */

    int i, j, k;
    int n1 = mid - left + 1;
    int n2 = right - mid;

    // Alocate temp arrays
    int *L = malloc((n1 + 2) * sizeof(int));
    int *R = malloc((n2 + 2) * sizeof(int));
    if (L == NULL || R == NULL) {
        fprintf(stderr, "Fatal: failed to allocate memory fo temp arrays.");
        exit(EXIT_FAILURE);
    }

    // Populate temp arrays
    for (i = 1; i <= n1; i++) {
        L[i] = arr[left + i - 1];
    }
    for (j = 1; j <= n2; j++) {
        R[j] = arr[mid + j];
    }

    L[n1 + 1] = INT_MAX;
    R[n2 + 1] = INT_MAX;
    i = 1;
    j = 1;

    // Merge arrays
    for (k = left; k <= right; k++) {
        if (L[i] <= R[j]) {
            arr[k] = L[i];
            i++;
        } else {
            arr[k] = R[j];
            j++;
        }
    }

    free(L);
    free(R);
}


void mergeSort(int arr[], int left, int right) {
    /* Sort and then merge arrays */

    if (left < right) {
        int mid = left + (right - left) / 2;

        mergeSort(arr, left, mid);
        mergeSort(arr, mid + 1, right);

        merge(arr, left, mid, right);
    }
}


void mergeThreadResults(long myRank, int myLeft, int myRight, int size, int threads) {

    int nextThread;
    int iter = 2;
    while (iter <= threads) {
        int nextThread = (myRank+1*iter) < threads ? (myRank+1*iter) : threads;
        int nextThreadRight = nextThread * ((float)size / (float)threads) - 1;

        printf("Merging threads %ld to %d\n", myRank, nextThread);
        
        if (myRank % iter != 0) {
            break;
        }

        // barrier
        pthread_mutex_lock(&mutex);
        counter++;
        int breakpoint = threads % 2 == 0 ? threads/iter : threads/iter+1;
        if(counter >= breakpoint ) {
            counter = 0;
            pthread_cond_broadcast(&cond_var);
        } else {
            while (pthread_cond_wait(&cond_var, &mutex) != 0);
        }
        pthread_mutex_unlock(&mutex);

        merge(sortingArray, myLeft, myRight, nextThreadRight);
        sleep(2); // <- sleep

        myRight = nextThreadRight;
        iter = iter * 2;
    }

     if (myRank == 0 && nextThread < threads-1) {
        int nextThreadRight = threads * ((float)size / (float)threads) - 1;
        merge(sortingArray, myLeft, myRight, nextThreadRight);
     }

}

void *sortManager(void *threadInfo) {
    /* Manage mergeSort between threads */

    struct ThreadTask *currentTask = threadInfo;

    // Get task arguments
    long rank = currentTask->rank;
    int left= rank * ((float)currentTask->size / (float)currentTask->threads);
    int right = (rank + 1) * ((float)currentTask->size / (float)currentTask->threads) - 1;
    int mid = left + (right - left) / 2;

    // Execute merge for task division
    if (left < right) {
        mergeSort(sortingArray, left, mid);
        mergeSort(sortingArray, mid + 1, right);
        merge(sortingArray, left, mid, right);
    }

    // Merge thread results
    if (rank % 2 == 0)  {
        mergeThreadResults(rank, left, right, currentTask->size, currentTask->threads);
    }

    return 0;
}


struct ThreadTask *threadCreator(int size, int threads, pthread_t *thread_handles, struct ThreadTask *tasksHolder) {
    /* Create threads with each task info */

    struct ThreadTask *threadTask;

    for (long thread = 0; thread < threads; thread++){
        threadTask = &tasksHolder[thread];
        threadTask->rank = thread;
        threadTask->size = size;
        threadTask->threads = threads;

        pthread_create(&thread_handles[thread], NULL, sortManager, (void*) threadTask);
    }

    return tasksHolder;
}


void printArray(int arr[], int size) {
    /* Print array */

    for (int arrayIndex = 0; arrayIndex < size; arrayIndex++)
        printf("%d ", arr[arrayIndex]);
    printf("\n");
}


int main(int argc, char *argv[]) {

    // Initialize arguments
    int arraySize = 20;
    int totalThreads = 16;

    
    // Display input
    printf("\nInput array:\n");
    printArray(sortingArray, arraySize);
    

    // Initialize threads
    pthread_t *thread_handles;
    thread_handles = malloc(totalThreads * sizeof(pthread_t));

    // Create threads
    struct ThreadTask threadTasksHolder[totalThreads];
    *threadTasksHolder = *threadCreator(arraySize, totalThreads, thread_handles, threadTasksHolder);
    
    // Execute merge sort in each thread
    for (long thread = 0; thread < totalThreads; thread++) {
        pthread_join(thread_handles[thread], NULL);
    }
    free(thread_handles);
    

    // Display output
    printf("\nSorted array:\n");
    printArray(sortingArray, arraySize);
    
    return 0;
}

Upvotes: 3

Views: 965

Answers (2)

Arthur Pereira
Arthur Pereira

Reputation: 1559

As @John Bollinger said, your approach looks unnecessarily difficult, and a solution would be equally complicated. But if you want to implement a barrier I would suggest you put it after the merge in mergeThreadResults. That way, you can wait for all the threads doing work on that cycle to finnish before moving to the next one.

To create this, you will need to pass a new barrier every iteration. Because at each cycle the number of threads doing the merge will decrease. So start declaring some global barriers:

int mergeCycleFlag = 0;
pthread_mutex_t mutex;
pthread_barrier_t *mergeBarrier;

The flag is used to create a barrier for each iteration, and we will need multiple mergeBarrier for each cycle. Don't forget to initialize it in your main function, with the numbers of iteration you will do: mergeBarrier = realloc(mergeBarrier, howManyIterations);

Then we can create a barrier like this:

        pthread_mutex_lock(&mutex);
        if (mergeCycleFlag != iter) { 
            mergeCycleFlag = iter;
            int mergesInLoop = threads % iter== 0 ? threads/iter: threads/iter+1;
            pthread_barrier_init(&mergeBarrier[iter], NULL, mergesInLoop);
        }
        pthread_mutex_unlock(&mutex);

        ... MERGE ...

        // Wait everyone finnish merging
        pthread_barrier_wait (&mergeBarrier[iter]);

Note that I use a lock to create the barrier, because we don't want two threads messing around here at the same time. If there is no barrier set for this iter we create one with the number of threads that should work now. Also, I've changed your breakpoint statement to fit the calculation of how many threads we expect to perform a merge.

After some adjustment, this is what your mergeThreadResults should look like:

void mergeThreadResults(long myRank, int myLeft, int myRight, int size, int threads) {
    
    int nextThread, nextThreadRight;
    int groupSize = 2;

    while (groupSize <= threads) {
        if (myRank % groupSize != 0) { // Release threads that no long perform merges
            break;
        }

        nextThread = (myRank+1*groupSize) < threads ? (myRank+1*groupSize) : threads;
        nextThreadRight = nextThread * ((float)size / (float)threads) - 1;
 
        printf("Merging threads %ld to %d\n", myRank, nextThread-1);

        // Init barrier with number of threads you will wait merging 
        pthread_mutex_lock(&mutex);  // Just one thread can set the barrier
        if (mergeCycleFlag != groupSize) { 
            mergeCycleFlag = groupSize;
            int mergesInLoop = threads % groupSize == 0 ? threads/groupSize : threads/groupSize+1; // Calculate threads working in this step
            pthread_barrier_init(&mergeBarrier[groupSize], NULL, mergesInLoop);  // set barrier
        }
        pthread_mutex_unlock(&mutex);

        // Merge thread group with neighbour group
        merge(sortingArray, myLeft, myRight, nextThreadRight);

        // Wait everyone finnish merging
        pthread_barrier_wait (&mergeBarrier[groupSize]);

        myRight = nextThreadRight;
        groupSize = groupSize * 2;
    }

    // Merge thread 0
    if (myRank == 0 && nextThread < threads-1) {
        nextThreadRight = threads * ((float)size / (float)threads) - 1;
        merge(sortingArray, myLeft, myRight, nextThreadRight);
    }
}

Finally, for this solution to work you need that every thread has finnished their job before merging results. So you need to either call it after your join in main, or implement another barrier with all threads before your call to mergeThreadResults on sortManager.

Also, an even better approach would be for the threads to wait only for the other threads that they will merge. Like, thread 0 waits only for 1. Then for 2... and etc.

Upvotes: 1

John Bollinger
John Bollinger

Reputation: 180351

I'm trying to parallelize a merge-sort algorithm. What I'm doing is dividing the input array for each thread, then merging the threads results.

Ok, but yours is an unnecessarily difficult approach. At each step of the merge process, you want half of your threads to wait for the other half to finish, and the most natural way for one thread to wait for another to finish is to use pthread_join(). If you wanted all of your threads to continue with more work after synchronizing then that would be different, but in this case, those that are not responsible for any more merges have nothing at all left to do.

This is what I've tried:

        pthread_mutex_lock(&mutex);
        counter++;
        int breakpoint = threads % 2 == 0 ? threads/iter : threads/iter+1;
        if(counter >= breakpoint ) {
            counter = 0;
            pthread_cond_broadcast(&cond_var);
        } else {
            while (pthread_cond_wait(&cond_var, &mutex) != 0);
        }
        pthread_mutex_unlock(&mutex);

There are several problems with that, but the biggest is that a barrier is the wrong tool for the job. After a barrier is crested, all the threads that were blocked at it proceed. You want half of the threads to proceed, performing merges, but the others (should) have no more work to do. Your computation of breakpoint assumes that that second half will not return to the barrier, which indeed they should not do. If you insist on using a barrier then the threads that have no merge to perform should terminate after passing through the barrier.

Moreover, it is incorrect to start iter at 2. If you use a barrier approach then all the threads active at each iteration must reach the barrier before any proceed, but if iter starts at 2 then on the first iteration, only half of all the threads must reach the barrier before it is passed.

Additionally, your CV use is non-idiomatic and susceptible to problems. None of the documented failure reasons for pthread_cond_wait() can be rescued by trying to wait again as you do, so you probably need to terminate the program on error, instead. Note also that pthread_mutex_lock(), pthread_mutex_unlock(), and pthread_cond_broadcast() all may fail, too.

On the other hand, CVs are susceptible to (very rare) spurious wakeups, so on successful return from a wait you need to check the condition again before proceeding, and possibly wait again. Something more like this:

        if (pthread_mutex_lock(&mutex) != 0) {
            perror("pthread_mutex_lock");
            abort();
        }
        counter++;
        int breakpoint = threads % 2 == 0 ? threads/iter : threads/iter+1;
        if(counter >= breakpoint ) {
            counter = 0;
            if (pthread_cond_broadcast(&cond_var) != 0) {
                perror("pthread_cond_broadcast");
                abort();
            }
        } else {
            do {
                if (pthread_cond_wait(&cond_var, &mutex) != 0) {
                    perror("pthread_cond_wait");
                    abort();
                }
            } while (counter < breakpoint);
        }
        if (pthread_mutex_unlock(&mutex) != 0) {
            perror("pthread_mutex_unlock");
            abort();
        }

        // some threads must terminate at this point

Upvotes: 1

Related Questions