Micheal XIV
Micheal XIV

Reputation: 515

Pthread signalling in C Linux

I am working with multi-threading in Linux using Pthread.

Thread1 waits for an IRQ from Driver by polling a character device file (my driver has ISR to catch IRQ from HW).

IRQ -----> Thread1 |-----> Thread2 |-----> Thread3 |-----> Thread4

Whenever Thread1 gets an IRQ, I want send a signal to Thread2, Thread3 and Thread4 to wake them up and then work.

Now, I am trying to use "pthread conditional variable" and "pthread mutex". But it seems that is not good approach.

What is efficient way for synchronization in this case? Please help.

Thank you very much.

Upvotes: 0

Views: 438

Answers (2)

mevets
mevets

Reputation: 10445

I like jeremy's answer, but it does have some lacking in that the interrupt dispatcher needs to know how many semaphores to increment on each interrupt. Also each increment is potentially a kernel call, so you have a lot of kernel calls for each interrupt.

An alternate is to understand how pthread_cond_broadcast() works. I have put an example below:

#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>
#include <stdbool.h>

#ifndef NTHREAD
#define NTHREAD 5
#endif

pthread_mutex_t Lock;
pthread_cond_t  CV;
int GlobalCount;
int Done;

#define X(y)   do { if (y == -1) abort(); } while (0)

void *handler(void *x) {
    unsigned icount;

    X(pthread_mutex_lock(&Lock));
    icount = 0;
    while (!Done) {
        if (icount < GlobalCount) {
            X(pthread_mutex_unlock(&Lock));
            icount++;
            X(pthread_mutex_lock(&Lock));
        } else {
            X(pthread_cond_wait(&CV, &Lock));
        }
    }
    X(pthread_mutex_unlock(&Lock));
    return NULL;
}

int 
main()
{
    X(pthread_mutex_init(&Lock, NULL));
    X(pthread_cond_init(&CV, NULL));
    pthread_t id[NTHREAD];
    int i;
    for (i = 0; i < NTHREAD; i++) {
        X(pthread_create(id+i, NULL, handler, NULL));
    }
    int c;
    while ((c = getchar()) != EOF) {
        X(pthread_mutex_lock(&Lock));
        GlobalCount++;
        X(pthread_mutex_unlock(&Lock));
        X(pthread_cond_broadcast(&CV));
    }

    X(pthread_mutex_lock(&Lock));
    Done = 1;
    X(pthread_cond_broadcast(&CV));
    X(pthread_mutex_unlock(&Lock));
    for (i = 0; i < NTHREAD; i++) {
        X(pthread_join(id[i], NULL));
    }
    return 0;
}

Upvotes: 0

Jeremy Friesner
Jeremy Friesner

Reputation: 73304

As I understand it, your problem is that your child threads (Threads 2 through 4) don't always wake up exactly once for every IRQ that Thread1 receives -- in particular, it might be that an IRQ is received while the child threads are already awake and working on an earlier IRQ, and that causes them not to be awoken for the new IRQ.

If that's correct, then I think a simple solution is to use a counting semaphore for each child-thread, rather than a condition variable. A semaphore is a simple data structure that maintains an integer counter, and supplies two operations, wait/P and signal/V. wait/P decrements the counter, and if the counter's new value is negative, it blocks until the counter has become non-negative again. signal/V increments the counter, and in the case where the counter was negative before the increment, awakens a waiting thread (if one was blocked inside wait/P).

The effect of this is that in the case where your main thread gets multiple IRQs in quick succession, the semaphore will "remember" the multiple signal/V calls (as a positive integer value of the counter), and allow the worker-thread to call wait/P that-many times in the future without blocking. That way no signals are ever "forgotten".

Linux supplies a semaphore API (via sem_init(), etc), but it's designed for inter-process synchronization and is therefore a little bit heavy-weight for synchronizing threads within a single process. Fortunately, it's easy to implement your own semaphore using a pthreads mutex and condition-variable, as shown below.

Note that in this toy example, the main() thread is playing the part of Thread1, and it will pretend to have received an IRQ every time you press return in the terminal window. The child threads are playing the part of Threads2-4, and they will pretend to do one second's worth of "work" every time Thread1 signals them. In particular note that if you press return multiple times in quick succession, the child threads will always do that many "work units", even though they can only perform one work-unit per second.

#include <stdio.h>
#include <unistd.h>
#include <pthread.h>

struct example_semaphore
{
   pthread_cond_t cond;
   pthread_mutex_t mutex;
   int count;  // acccess to this is serialized by locking (mutex)
};

// Initializes the example_semaphore (to be called at startup)
void Init_example_semaphore(struct example_semaphore * s)
{
   s->count = 0;
   pthread_mutex_init(&s->mutex, NULL);
   pthread_cond_init(&s->cond, NULL);
}

// V:  Increments the example_semaphore's count by 1.  If the pre-increment
//     value was negative, wakes a process that was waiting on the
//     example_semaphore
void Signal_example_semaphore(struct example_semaphore * s)
{
   pthread_mutex_lock(&s->mutex);
   if (s->count++ < 0) pthread_cond_signal(&s->cond);
   pthread_mutex_unlock(&s->mutex);
}

// P:  Decrements the example_semaphore's count by 1.  If the new value of the
//     example_semaphore is negative, blocks the caller until another thread calls
//     Signal_example_semaphore()
void Wait_example_semaphore(struct example_semaphore * s)
{
   pthread_mutex_lock(&s->mutex);
   while(--s->count < 0)
   {
      pthread_cond_wait(&s->cond, &s->mutex);
      if (s->count >= 0) break;
   }
   pthread_mutex_unlock(&s->mutex);
}

// This is the function that the worker-threads run
void * WorkerThreadFunc(void * arg)
{
   int workUnit = 0;
   struct example_semaphore * my_semaphore = (struct example_semaphore *) arg;
   while(1)
   {
      Wait_example_semaphore(my_semaphore);  // wait here until it's time to work
      printf("Thread %p: just woke up and is working on work-unit #%i...\n", my_semaphore, workUnit++);
      sleep(1);  // actual work would happen here in a real program
   }
}

static const int NUM_THREADS = 3;

int main(int argc, char ** argv)
{
   struct example_semaphore semaphores[NUM_THREADS];
   pthread_t worker_threads[NUM_THREADS];

   // Setup semaphores and spawn worker threads
   int i = 0;
   for (i=0; i<NUM_THREADS; i++)
   {
      Init_example_semaphore(&semaphores[i]);
      pthread_create(&worker_threads[i], NULL, WorkerThreadFunc, &semaphores[i]);
   }

   // Now we'll pretend to be receiving IRQs.  We'll pretent to
   // get one IRQ each time you press return.
   while(1)
   {
      char buf[128];
      fgets(buf, sizeof(buf), stdin);
      printf("Main thread got IRQ, signalling child threads now!\n");
      for (i=0; i<NUM_THREADS; i++) Signal_example_semaphore(&semaphores[i]);
   }
}

Upvotes: 1

Related Questions