b20000
b20000

Reputation: 1005

parallelize data processing on multiple cores using pthreads

My goal is to process data on multiple cores by using multiple worker threads, and then further process the results in a master thread. I'm working on linux and I'd like to use pthreads. I'm created a simple example to learn how to do this properly. I have one main thread named "callback" and 4 worker threads. The idea is that the main thread signals the worker threads to start processing, and the 4 threads then signal the main thread when they are done, and the main thread quits after it has been notified by all 4 threads that they are done. I want the 4 worker threads to be able to run in parallel, so I don't want any of those threads waiting for the others. In my example I've tried to just let each of the threads sleep for a different duration (1, 2, 3 and 4 seconds), with the idea that the code would exit after 4 seconds (i.e. when worker thread 4 has done waiting 4 seconds).

For some reason, my code is incorrect, and it always exits immediately, printing this:

thread 3 start (sleeping 3000 ms)
thread 2 start (sleeping 2000 ms)
thread 1 start (sleeping 1000 ms)
thread 4 start (sleeping 4000 ms)
    thread 1 stop
    thread 2 stop
    thread 3 stop
    thread 4 stop
Main(): Waited on 5  threads. Done.

So the threads do seem to exit in the correct order but the program doesn't take 4 seconds to run.

What is going on here? I've pasted the code below

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>

// based on https://computing.llnl.gov/tutorials/pthreads/#ConditionVariables

#define DUR 1000
#define NUM_THREADS 5
int state = 0; 

pthread_mutex_t mutex;
pthread_cond_t conddone;
pthread_cond_t condwork;

void* callback(void* t) { 

    // signal worker threads to start work 

        pthread_mutex_lock(&mutex);
        pthread_cond_broadcast(&condwork);
        pthread_mutex_unlock(&mutex);

    // wait for worker threads to finish 

        pthread_mutex_lock(&mutex);
        while (state < 4)
            pthread_cond_wait(&conddone, &mutex);
        pthread_mutex_unlock(&mutex);

   pthread_exit(NULL);

} 

void* worker(void* t) { 

   long id = (long)t;

    // wait for signal from callback to start doing work 

    pthread_mutex_lock(&mutex);
    pthread_cond_wait(&condwork, &mutex);
    pthread_mutex_unlock(&mutex);

    // do work 

    printf("thread %d start (sleeping %d ms)\n", id, id * DUR); 
    usleep(id * DUR); 
    printf("    thread %d stop\n", id); 

    // tell callback we're done 

    pthread_mutex_lock(&mutex);
    state++;
    pthread_cond_signal(&conddone);
    pthread_mutex_unlock(&mutex);

   pthread_exit(NULL);

}


int main (int argc, char *argv[])
{
   int i, rc;

   pthread_t threads[5];
   pthread_attr_t attr;

   pthread_mutex_init(&mutex, NULL);
   pthread_cond_init (&condwork, NULL);
   pthread_cond_init (&conddone, NULL);

   pthread_attr_init(&attr);
   pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

   pthread_create(&threads[0], &attr, callback, (void *)0);
   pthread_create(&threads[1], &attr, worker, (void *)1);
   pthread_create(&threads[2], &attr, worker, (void *)2);
   pthread_create(&threads[3], &attr, worker, (void *)3);
   pthread_create(&threads[4], &attr, worker, (void *)4);

   for (i=0; i<NUM_THREADS; i++) {
     pthread_join(threads[i], NULL);
   }
   printf ("Main(): Waited on %d  threads. Done.\n", NUM_THREADS);

   pthread_attr_destroy(&attr);
   pthread_mutex_destroy(&mutex);
   pthread_cond_destroy(&condwork);
   pthread_cond_destroy(&conddone);
   pthread_exit(NULL);
} 

Upvotes: 0

Views: 69

Answers (1)

caf
caf

Reputation: 239181

Your immediate problem is just that usleep() sleeps for microseconds not milliseconds so your threads are sleeping for a thousandth of the time that you intend them to.

You do have another problem, though: your condwork condition variable is not paired with a predicate over shared state (eg the predicate state < 4 for the conddone variable). If one of your worker threads executes the pthread_cond_wait() after the "callback" thread has executed the pthread_cond_broadcast(), the worker will wait indefinitely.

You could fix this by initialising the state variable to -1:

int state = -1;

and having your workers wait on the predicate state < 0:

// wait for signal from callback to start doing work 

pthread_mutex_lock(&mutex);
while (state < 0)
    pthread_cond_wait(&condwork, &mutex);
pthread_mutex_unlock(&mutex);

and having the "callback" signal the workers by setting state to 0:

// signal worker threads to start work 

pthread_mutex_lock(&mutex);
state = 0;
pthread_cond_broadcast(&condwork);
pthread_mutex_unlock(&mutex);

Upvotes: 1

Related Questions