Reputation: 67
I'm currently learning how to use mutexes and Semaphores for multithreading in linux using pthreads, and I've been working on an Implementation of mutlti-producer/one-Consumer problem using only two binary semaphores and a mutex to synck access to bounded buffer, but the program doesn't work as planned
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <math.h>
#include <semaphore.h>
#define QUEUESIZE 20
#define LOOP 15
#define MAX_THREADS 10
void *producer (void *args);
void *consumer (void *args);
int lambda;
int queuesi;
typedef struct {
int buf[QUEUESIZE];
long head, tail;
int full, empty;
pthread_mutex_t *mut;
sem_t *sem_notFull, *sem_notEmpty;
} queue;
queue *queueInit (void);
void queueDelete (queue *q);
void queueAdd (queue *q, int in);
void queueDel (queue *q, int *out);
int factorial( int i);
int poissonfunction(int i);
int rando();
int main (int argc, char *argv[])
{
int max_pro,max_con,j,i;
queue *fifo;
pthread_t prod[MAX_THREADS],cons[MAX_THREADS];
max_con=1;
max_pro=4;
lambda=6;
queuesi=4;
fifo = queueInit ();
if (fifo == NULL) {
fprintf (stderr, "main: Queue Init failed.\n");
exit (1);
}
for(i=0; i<max_pro;i++)
pthread_create (&prod[i], NULL, producer,fifo);
for(j=0;j<max_con;j++)
pthread_create (&cons[j], NULL, consumer, fifo);
for(i=0; i<max_pro;i++)
pthread_join (prod[i], NULL);
for(j=0;j<max_con;j++)
pthread_join (cons[j], NULL);
queueDelete (fifo);
return 0;
}
void *producer (void *q)
{
queue *fifo;
int i,insert,sleep_time;
fifo = (queue *)q;
for (i = 0; i < LOOP; i++) {
//pthread_mutex_lock (fifo->mut);
while (fifo->full)
{
printf ("producer: queue FULL.\n");
sem_wait (fifo->sem_notFull);
}
pthread_mutex_lock (fifo->mut);
insert=rando();
queueAdd (fifo, insert);
printf("producer item number%d item produced %d\n",i,insert);
pthread_mutex_unlock (fifo->mut);
sem_post (fifo->sem_notEmpty);
sleep_time=poissonfunction(i);
usleep (sleep_time);
}
return (NULL);
}
void *consumer (void *q)
{
queue *fifo;
int i, d;
fifo = (queue *)q;
for (i = 0; i < LOOP; i++)
{
//pthread_mutex_lock (fifo->mut);
while (fifo->empty)
{
printf ("consumer: queue empty\n");
sem_wait (fifo->sem_notEmpty);
}
pthread_mutex_lock (fifo->mut);
queueDel (fifo, &d);
pthread_mutex_unlock (fifo->mut);
sem_post (fifo->sem_notFull);
printf ("consumer: recieved %d.\n", d);
usleep(2000);
}
return (NULL);
}
queue *queueInit (void)
{
queue *q;
q = (queue *)malloc (sizeof (queue));
if (q == NULL) return (NULL);
q->empty = 1;
q->full = 0;
q->head = 0;
q->tail = 0;
q->mut = (pthread_mutex_t *) malloc (sizeof (pthread_mutex_t));
pthread_mutex_init (q->mut, NULL);
q->sem_notFull = (sem_t *) malloc (sizeof (sem_t));
sem_init (q->sem_notFull, 0, 1);
q->sem_notEmpty = (sem_t *) malloc (sizeof (sem_t));
sem_init (q->sem_notEmpty, 0, 1);
return (q);
}
void queueDelete (queue *q)
{
pthread_mutex_destroy (q->mut);
free (q->mut);
sem_destroy (q->sem_notFull);
free (q->sem_notFull);
sem_destroy (q->sem_notEmpty);
free (q->sem_notEmpty);
free (q);
}
void queueAdd (queue *q, int in)
{
q->buf[q->tail] = in;
q->tail++;
if (q->tail == queuesi)
q->tail = 0;
if (q->tail == q->head)
q->full = 1;
q->empty = 0;
return;
}
void queueDel (queue *q, int *out)
{
*out = q->buf[q->head];
q->head++;
if (q->head == queuesi)
q->head = 0;
if (q->head == q->tail)
q->empty = 1;
q->full = 0;
return;
}
int factorial(int i){
if(i==0)
return i=1;
else
i=i*factorial(i-1);
return (i);
}
int poissonfunction (int i){
int time,c,t;
double p;
t = 1000;
c = lambda*t;
p = (pow(c,i)*exp(-c))/factorial(i);
time =(int) p*t;
return (time);
}
int rando(){
int value;
value=(int) random()/1000;
return (value);
}
Upvotes: 2
Views: 174
Reputation: 4722
I would suggest to modify your functions like:
void *producer (void *q)
{
queue *fifo;
int i,insert,sleep_time;
fifo = (queue *)q;
for (i = 0; i < LOOP; i++) {
pthread_mutex_lock (fifo->mut); /* take lock */
while (fifo->full) { /* check for fullness */
pthread_mutex_unlock (fifo->mut); /* if full wait for things to be removed */
sem_wait (fifo->sem_notFull); /* never sleep with lock */
pthread_mutex_lock (fifo->mut); /* take lock an recheck the condition to handle spurious wakeups */
}
/* here lock is held */
insert = rando(); /* safely manipulate the queue */
queueAdd (fifo, insert);
pthread_mutex_unlock (fifo->mut); /* safely release the queue */
sem_post (fifo->sem_notEmpty); /* wake up potential waiters */
sleep_time=poissonfunction(i);
usleep (sleep_time);
}
return (NULL);
}
void *consumer (void *q)
{
queue *fifo;
int i, d;
fifo = (queue *)q;
for (i = 0; i < LOOP; i++) { /* careful you don't produce 'enough' */
pthread_mutex_lock (fifo->mut); /* take lock before using fifo */
while (fifo->empty) { /* check is something to read */
pthread_mutex_unlock (fifo->mut); /* release lock before sleep */
sem_wait (fifo->sem_notEmpty); /* wait a sometihing to be pused in queue */
pthread_mutex_lock (fifo->mut); /* take back lock before checking for spurious wakeup */
}
/* from here lock is held, queue can be safely maniplated */
queueDel (fifo, &d);
pthread_mutex_unlock (fifo->mut); /* finished -> release lock */
sem_post (fifo->sem_notFull); /* wake up eventual waiters*/
usleep(2000);
}
return (NULL);
}
You need to check spurious wake ups as the semaphore is always notified even if the queue is not empty/full, thus being woke up does not mean the queue is in the correct state, thus a test is needed.
You may use conditional_variable to handle the full and empty event, this would remove the semaphores here.
Upvotes: 2