Maxwell Powell
Maxwell Powell

Reputation: 143

Consumer/Producer multithread synchronization error

I've written a program that consists of four threads. The first for initializing the others, the second for producing items and adding them to a buffer, the third for adding items to another buffer, and a fourth for consuming items from either buffer. My problem is that the fourth thread is reading from buffer slots before they have been written to. I have placed semaphores in an attempt to synchronize but they are not performing as I would expect.

I've simplified my code while maintaining the error:

#include <stdlib.h>
#include <stdio.h>
#include <semaphore.h>
#include <pthread.h>
#include <sys/types.h>

/* thread routines */
void* threadA ();
void* threadB ();
void* threadC ();

/* thread ids */
pthread_t tid[3];

/* semaphores */
sem_t sA;
sem_t sB;
sem_t a_empty;
sem_t b_empty;
sem_t both_full;
sem_t mutex;

/* buffers */
int buffA[20];
int buffB[40];

/* int a for tracking buffA */
int a = 0;

int main()
{

/* initializing semaphores */
  sem_init (&a_empty, 20, 1);
  sem_init (&b_empty, 40, 1);
  sem_init (&both_full, 0, 1);
  sem_init (&mutex, 1, 1);
  sem_init (&sA, 0, 1);
  sem_init (&sB, 0, 1);

/* creating threads */
  pthread_create(&tid[0], NULL, threadA, NULL);
  pthread_create(&tid[1], NULL, threadB, NULL);
  pthread_create(&tid[2], NULL, threadC, NULL);

/* waiting on threads */
  pthread_join(tid[0], NULL);
  pthread_join(tid[1], NULL);
  pthread_join(tid[2], NULL);
}

/* Producer */
void* threadA ()
{
 int i;
 int inA = 0;

/* two process two way synch*/
 sem_post(&sB);
 sem_wait(&sA);

 for (i = 1; i <= 300; i++)
 {

/* adding item to buffer */
   sem_wait(&a_empty);
   buffA[inA] = i;

/* incrementing a */
   sem_wait(&mutex);
   a = a + 1;
   sem_post(&mutex);

/* signaling full buffer */
   sem_post(&both_full);

/* updating inA */
   inA = (inA + 1) % 20;

 }

 pthread_exit(NULL);
}

/* Producer */
void* threadB ()
{
 int i, j, inB;
 inB = 0;

/* two process two way synch*/
 sem_post(&sA);
 sem_wait(&sB);

 for(i = 1; i <= 400; i++)
 {

/* producing item. */ 
   sem_wait(&b_empty);
   buffB[inB] = i;

   sem_post(&both_full);

   inB = (inB + 1) % 40;
 }

 pthread_exit(NULL);
}

/* Consumer */
void* threadC ()
{
 int i, j, outA, outB, bsum, asum;
 outA = 0;
 outB = 0;
 asum = 0;
 bsum = 0;
 int prod_a;
 int prod_b;

 for(i = 0; i < 700; i++)
 {

  sem_wait(&both_full);
  if (a > 0) {
    asum++;
    prod_a = buffA[outA];
    outA = (outA + 1) % 20;
    sem_wait(&mutex);
    a = a - 1;
    sem_post(&mutex);
    sem_post(&a_empty);

    printf("A-%d\t", prod_a);
  } 
  else {
    bsum++;

    prod_b = buffB[outB];
    outB = (outB + 1) % 40;  
    sem_post(&b_empty);

    printf("B-%d\t", prod_b);
  }
 }

 printf("\n a=%d b=%d \n", asum, bsum);
 pthread_exit(NULL);
}

My problem seems to be that the else statement in threadC() is being executed when it should not be. I cannot pinpoint what would cause this.

EDIT: I am using gcc file.c -o file.o -lrt -lpthread to compile

Upvotes: 1

Views: 71

Answers (1)

Luis de Arquer
Luis de Arquer

Reputation: 387

Semaphore both_full should be initialized with value 0:

sem_init (&both_full, 0, 0);

Normally, outB should always be equal or smaller than inB. But with both_full initialized to 1, the else statement in threadC() is run at some point with no input available, allowing outB to be increased up to inB+1. So for the first 40 times, you may see zeroes (static buffers are zeroed at program start), and thereafter you may see old values, like B-51 B-52 B-13 B-54

Upvotes: 1

Related Questions