Reputation: 345
I am having trouble writing this C code where I need to have two producers with their own buffers. and then one consumer which consumes one item from either buffer and only waits if both buffers are empty. I know how to write the code for if the consumer waits if either producer buffer is empty,
sem_wait(Xfull);
item1 = BufferX[outX];
sem_signal(Xempty);
sem_wait(Yfull);
item2 = BufferB[outY];
sem_signal(BufferBEmpty);
But I'm stuck on this.
Upvotes: 4
Views: 1690
Reputation: 2899
You could have a semaphore that counts the total number of unconsumed items in both queues. Producers post to it and consumer(s) wait on it. Once a consumer successfully waits on the semaphore then it uses mutexes to examine each queue to find an item to consume.
sem_t ProducedItems; // counts total # of unconsumed items in Q's A and B
sem_t Q_A; // mutex on Q A
sem_t Q_B; // mutex on Q B
void *consumer(void *arg)
{
int bias = 0; // NOTE: could use to not always favor reading from Q A over Q B
while (1)
{
// wait for an item to be produced
sem_wait(&ProducedItems);
// find an item to consume
do
{
sem_wait(&Q_A);
{
if (Q A not empty)
{
remove an item from Q A;
bias = 1;
}
}
sem_post(&Q_A);
if (got item from A)
break;
sem_wait(&Q_B);
{
remove an item from Q B;
bias = 0;
}
sem_post(&Q_B);
}
while (0);
// consume item
}
return NULL;
}
If consumers also need to signal back to producers to, for example, ensure that their Q's don't grow too large, then you could also have semaphores for each producer indicating how much space is left in their Q's. A producer would wait on its semaphore and the consumer would post to it as they removed items out of their respective Q.
If you wanted to generalize this to N production Q's, then you'd likely want to keep a bit mask, protected by a mutex, that indicates which Q's have unconsumed items on them.
Once a consumer gets the ProducedItems semaphore then it would lock the mask to see which Q's have unconsumed items on them and do a round robin progression on them to decide from which Q it should next read. Then it would go lock+grab an item from that Q, update the bit mask if that Q became empty, release the mutexes and finally consume the item.
Producers would put items into their Q, if it had been empty then they'd also lock the global bit mask and set its Q's bit there, release the mutexes and then post the ProducedItems semaphore.
Or rather than a bit mask, you could have an array of the sizes of the Q's, although this would require the producers to lock the mutex every time that they produce, etc.
Upvotes: 1
Reputation: 6333
condition variable is applicable in this case.
if i understand your problem properly. you have one consumer to consume two producers. therefore, you can
pthread_cond_t *cond;
pthread_mutex_t *cond_mutex;
sem_t *x, *y;
in consumer:
while(exit_cond) {
pthread_cond_wait(cond, cond_mutex);
if(sem_trywait(x) == 0)
consumeX(); // then post x
if(sem_trywait(y) == 0)
consumeY(); // then post y
}
in both producer:
while(exit_cond) {
produce(); // hold and release semaphore
pthread_cond_signal(cond);
}
in this way, all consumer and producers can work as parallel as they can.
Upvotes: 0
Reputation: 6642
An option would be to implement a busy-wait in both semaphores:
int sem;
do {
if (sem_trywait(sem1) == 0) {
sem = 1;
break;
}
if (sem_trywait(sem2) == 0) {
sem = 2;
break;
}
usleep(1);
} while (1);
if (sem == 1) {
...
}
Another option, which I found in this answer, is to take a look to sun's implementation of the WaitForMultipleObjects
api, and I quote:
The basic idea is to associate a list of condition variables to a handle (protected by a mutex), and signal all of the condition variables whenever the handle is signaled. Each time you call the emulated WaitForMultipleObjects, a new condition variable is created and added to the list of all handles you are interested in. In the WaitForMultipleObjects emulation, you block on the condition variable, and check each of your handles when you wake up.
The paper can be found here.
Upvotes: 0