Reputation: 1052
I was trying to make a code with multi producers and consumers. I have created multi-threads for producer and consumer and used semaphores for synchronization. The code was working fine with single producer and consumer.
The problem which I am facing is that after some time of program execution, only the consumer1 and producer1 are participating in the process. I am not able to understand what happened to the other producers and consumers.
I would also like to know as how to make multi producer-consumer problem efficient? Efficient in the sense that all producer and consumer gets equal opportunity to produce and consume respectively? C++ code(it includes a lot of C):
#include <iostream>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
#include <queue>
using namespace std;
sem_t empty;
sem_t full;
int cnt = 0;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
queue<int> q;
void *producer(void *a)
{
int *num = (int *)a;
while(1) {
sem_wait(&empty);
pthread_mutex_lock(&mutex);
cnt = cnt+1;
q.push(cnt);
cout<<cnt<<" item produced by producer "<<(*num+1)<<endl;
pthread_mutex_unlock(&mutex);
sem_post(&full);
sleep(1);
}
}
void *consumer(void *a)
{
int *num = (int *)a;
while(1) {
sem_wait(&full);
pthread_mutex_lock(&mutex);
cout<<q.front()<<" item consumed by consumer "<<(*num+1)<<endl;
q.pop();
pthread_mutex_unlock(&mutex);
sem_post(&empty);
sleep(1);
}
}
int main()
{
pthread_t p[5];
pthread_t c[5];
sem_init(&empty,0,5);
sem_init(&full,0,0);
int i;
for(i = 0; i < 5; i++) {
pthread_create(&p[i],NULL,producer,(void *)(&i));
}
for(i = 0; i < 5; i++) {
pthread_create(&c[i],NULL,consumer,(void *)(&i));
}
for(i = 0; i < 5; i++) {
pthread_join(p[i],NULL);
pthread_join(c[i],NULL);
}
}
Updated code:
#include <iostream>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
#include <queue>
#include <map>
using namespace std;
sem_t empty;
sem_t full;
int cnt = 0;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
map<pthread_t,int> mc,mp;
queue<int> q;
void *producer(void *a)
{
while(1) {
sem_wait(&empty);
pthread_mutex_lock(&mutex);
cnt = cnt+1;
q.push(cnt);
cout<<cnt<<" item produced by producer "<<mp[pthread_self()]<<endl;
pthread_mutex_unlock(&mutex);
sem_post(&full);
sleep(1);
}
}
void *consumer(void *a)
{
while(1) {
sem_wait(&full);
pthread_mutex_lock(&mutex);
cout<<q.front()<<" item consumed by consumer "<<mc[pthread_self()]<<endl;
q.pop();
pthread_mutex_unlock(&mutex);
sem_post(&empty);
sleep(1);
}
}
int main()
{
pthread_t p[5];
pthread_t c[5];
sem_init(&empty,0,5);
sem_init(&full,0,0);
int i;
pthread_mutex_lock(&mutex);
for(i = 0; i < 5; i++) {
pthread_create(&p[i],NULL,producer,NULL);
pthread_create(&c[i],NULL,consumer,NULL);
mc[c[i]] = i+1;
mp[p[i]] = i+1;
}
pthread_mutex_unlock(&mutex);
for(i = 0; i < 5; i++) {
pthread_join(p[i],NULL);
pthread_join(c[i],NULL);
}
}
Upvotes: 2
Views: 637
Reputation: 73500
Short answer
The threads do in fact execute with equal opportunity, but they just printout an identifier which is not theirs.
Detailed explanation
You keep in each thread a pointer num
to the thread number. It's the pointer to that value which is saved and not the value itself. So all the threads point to the same counter, thinking to find there their own identifier.
Everytime you access *num
, you get access not to the value that i
had when you launched the thread, but its current value.
Unfortunately, in every loop of main()
, you reuse the variable i
. So the last loop, you'll set i
back to 0
, and wait for the first threads to join. But all these threads loop forever, so the loop will hardly get a chance to go beyond this initial 0 value. So that every thread thinks it's the number *num+1
that is 1 at this moment.
Note by the way that you create a race condition as someone pointed out in the comments: all the consumer and producer threads dereference the pointer, accessing to the same variable in a mutex-protected region. This is ok. But while they are reading the variable, the main thread still happily can change the shared variable outside of any lock. This is definitively a risk of race.
Workaround
std::thread
would allow you to pass i
by walue, so that each thread has its own unaltered copy of is id.
With pthreads you have to pass a pointer to a value. Unfortunately, even if you'd do a local copy of the value pointed at, right at the start of the thread, you'd still be in a race condition.
A quick workaround to observe which thread is really doing the work would be to printout as well the result of pthread_self()
(see here how to do it). Or to store the ids in an array of int, and pass to each thread the address to a unique element in that array.
Upvotes: 2