Reputation: 483
Like everyone I was trying to understand linux OS better. I was implementing the producer consumer synchronization problem using semaphores.
i)There will be m producer processes and n consumer processes.
ii) A product is presented with a random number.
iii) The buffer
is shared between all consumer and producer processes while in
variable is shared between all producer processes and out
variable is shared between all consumer processes.
iv)All processes will stop once CTRL
+C
is entered in the terminal.
Here are the codes:
1.shared.h
//assume all required headers included
#define buffer_size 5
#define empty_id 0
#define full_id 1
#define mutex_id 2
#define no_sem 3
struct sembuf signall = {0 , 1, 0};
struct sembuf wait = {0, -1, 0};
#define W(s) semop(s, &wait, 1);
#define S(s) semop(s, &signall, 1);
int shmid;
int *buffer;
union semun
{
int val;
struct semid_ds *buf;
unsigned short *array;
struct seminfo *__buf;
} setvalarg[3];
int *create_shared_mem_buffer()
{
int *shmaddr;
key_t key = ftok("/home/******/**/B", 1);
shmid = shmget(key, buffer_size, IPC_CREAT|0660);
shmaddr= (int *)shmat(shmid, NULL, 0);
return shmaddr;
}
void clear_buffer()
{
int i;
for(i=0;i<buffer_size;i++)
buffer[i]=0;
}
void releaseSHM(int signum)
{
int status;
clear_buffer();
status = shmctl(shmid, IPC_RMID, NULL);
status = kill(0, SIGKILL);
exit(signum);
}
int create_semaphore_set()
{
key_t key= ftok("/home/******/**/A", 1);
int semid= semget(key, no_sem, IPC_CREAT|0600);
setvalarg[0].val=buffer_size;
semctl(semid, empty_id, SETVAL, setvalarg[0]);
setvalarg[1].val=0;
semctl(semid, full_id, SETVAL, setvalarg[1]);
setvalarg[2].val=1;
semctl(semid, mutex_id, SETVAL, setvalarg[2]);
return semid;
}
2.producer.c
#include "shared.h"
void insert_product(int item, int *in, int *buffer)
{
*in=(*in+1)%buffer_size;
buffer[*in]=item;
printf("Producer produces item %d stored in posn %d \n",item,*in);
}
int shmid1;
int main(int argc, char *argv[])
{
int i, pid, item;
int *in;
buffer = create_shared_mem_buffer();
int semid= create_semaphore_set();
sighandler_t shandler;
shandler = signal(SIGINT, releaseSHM);
shmid1= shmget(IPC_PRIVATE, sizeof(int),IPC_CREAT | 0777);
in=(int *)shmat(shmid1, NULL, 0);
*in=0;
int m=5;
for(i=0;i<m;i++)
{
pid=fork();
if(pid==0)
{
while(1)
{
item=rand();
wait.sem_num=0;
W(semid);
wait.sem_num=2;
W(semid);
insert_product(item, in, buffer);
signall.sem_num=2;
S(semid);
signall.sem_num=1;
S(semid);
sleep(2); //sleep has been introduced to slowdown the output.
}
}
}
return 0;
}
#include "shared.h"
int remove_product(int *out, int *buffer)
{
int item;
*out=(*out+1)%buffer_size;
item=buffer[*out];
buffer[*out]=0;
return item;
}
int shmid1;
int main(int argc, char *argv[])
{
int i, pid, item;
int *out;
buffer = create_shared_mem_buffer();
int semid= create_semaphore_set();
sighandler_t shandler;
shandler = signal(SIGINT, releaseSHM);
shmid1= shmget(IPC_PRIVATE, sizeof(int),IPC_CREAT | 0777);
out=(int *)shmat(shmid1, NULL, 0);
*out=-1;
clear_buffer(buffer);
int n=2;
for(i=0;i<n;i++)
{
pid=fork();
if(pid==0)
{
while(1)
{
wait.sem_num=1;
W(semid);
wait.sem_num=2;
W(semid);
item=remove_product(out, buffer);
printf("Consumer consumes the product %d \n", item);
signall.sem_num=2;
S(semid);
signall.sem_num=0;
S(semid);
sleep(2); //sleep has been introduced to slow down the output
}
}
}
return 0;
}
The output is somewhat like this:
Producer produces item 1681692777 stored in posn 2
Producer produces item 1681692777 stored in posn 3
Producer produces item 1681692777 stored in posn 4
Producer produces item 1804289383 stored in posn 1
Producer produces item 1804289383 stored in posn 2
Producer produces item 1804289383 stored in posn 3
Producer produces item 1804289383 stored in posn 0
Producer produces item 1804289383 stored in posn 0
Producer produces item 1681692777 stored in posn 0
Producer produces item 1714636915 stored in posn 1
Producer produces item 1714636915 stored in posn 2
Producer produces item 1714636915 stored in posn 3
Producer produces item 846930886 stored in posn 1
Producer produces item 1714636915 stored in posn 4
Producer produces item 846930886 stored in posn 2
Producer produces item 1714636915 stored in posn 0
Producer produces item 846930886 stored in posn 3
Producer produces item 846930886 stored in posn 4
Producer produces item 846930886 stored in posn 0
Producer produces item 1681692777 stored in posn 1
Consumer consumes the product 424238335
Consumer consumes the product 424238335
Consumer consumes the product 424238335
Consumer consumes the product 424238335
Consumer consumes the product 719885386
Consumer consumes the product 719885386
Consumer consumes the product 719885386
Consumer consumes the product 719885386
Consumer consumes the product 719885386
Consumer consumes the product 1649760492
Consumer consumes the product 1649760492
Consumer consumes the product 596516649
Consumer consumes the product 1649760492
Consumer consumes the product 1649760492
Consumer consumes the product 1649760492
Consumer consumes the product 596516649
Consumer consumes the product 596516649
Consumer consumes the product 596516649
As you can see, the same product is produced more than once and same product is consumed more than once.
i)What is the problem? How to resolve it?
ii) Also the processes donot stop when CTRL
+C
is entered.
I have done it all by myself and I need someone to review it. If further information needed, drop a comment below.
Upvotes: 1
Views: 711
Reputation: 18430
Here, you fork()
into several (five for m=5) processes:
for(i=0;i<m;i++)
{
pid=fork();
and here, you are producing random item numbers:
item=rand();
but all processes are using the same seed (which is 1
, because you did not call srand()
before)! So, all forked processes generate the same sequence of random numbers. This is the reason why you add every item 5 times. Try seeding the rng with a different number for each processes, for example by adding an srand()
here:
for(i=0;i<m;i++)
{
srand(m);
pid=fork();
Then, no duplicates should be added.
Upvotes: 0
Reputation: 8995
It's a little hard to understand your intended purpose for the apparently three semaphores that you're using in your logic. However, the problem is that you're "racing" for pulling off the numbers. And your logic is too complex.
Basically, you need this:
A semaphore which counts the number of numbers that are waiting to be consumed. post()
the semaphore when you add a number; wait()
to get one.
A mutex which controls access to the list itself, so that only one thread at a time can actually be inserting or removing values from the segment.
You might also need another semaphore which limits the number of entries that can be added to the segment ... you wait()
for permission to add the entry, and as the other processes remove the entry (thus freeing up a slot) they post()
to let anyone who's waiting proceed.
Finally: what you're doing here is an exercise. In real life there are "pipes" and other IPC mechanisms available "off the shelf" which will take care of these details for you.
Upvotes: 1