Reputation: 4670
In the Classical producer consumer problem. producer sleeps when itemCount == BUFFER_SIZE
amd wakes up again when it goes down. But once itemCount
grows up, producer thread is put to sleep. how can it know that itemCount
has gone down and it needs to wakeup
?
Upvotes: 2
Views: 11822
Reputation: 67713
In pseudo-code the producer is something like:
void producer_thread()
{
while(true)
queue.push( produce() );
}
so consider the queue push method (I've used pthreads here, but the same logic applies with other libraries)
void SynchronizedQueue::push(Item const &i)
{
pthread_mutex_lock(&mutex);
// queue is full, so wait for consumer
while (queue.size() == BUFFER_SIZE)
pthread_cond_wait(&condition, &mutex);
// when we get here, the queue has space
this->queue.push_back(i);
// make sure we wake a sleeping consumer
if (queue.size() == 1)
pthread_cond_signal(&condition);
pthread_mutex_unlock(&mutex);
}
and the pop method used by the consumer:
Item SynchronizedQueue::pop()
{
pthread_mutex_lock(&mutex);
// wait for something to do
while (queue.size() == 0)
pthread_cond_wait(&condition, &mutex);
// if we get here, we have some work
Item tmp = queue.front();
// make sure we wake a sleeping producer
if (queue.size() == BUFFER_SIZE)
pthread_cond_signal(&condition)
queue.pop_front();
pthread_mutex_unlock(&mutex);
return tmp;
}
Upvotes: 2
Reputation: 24847
It does not need to know - the OS will wake it up when a consumers signals. In the P-C queue code, the producer will make a wait() call on some OS synchronization primitive. This call will not return until a consumer thread makes space and signals the OS synchro object, (unless your faulty OS 'supports' spurious wakeups), at which point the waiting producer thread will be made ready and, if there is an available core, run immedaitely - the wait() call will return.
Traditionally, P-C queues are constructed from a simple non-thread-safe queue, a mutex to protect its indexes/pointers and two semaphores - one initialized to 0 to count items in the queue and one initialized to [queue size] to count empty spaces. The producer waits on the 'emptySpace' and, when it gets a signal, locks the mutex, enqueues an object, locks the mutex and signals 'itemCount'. The consumer waits on 'itemCount' and, when it gets a signal, locks the mutex, dequeues an object, locks the mutex and signals 'emptySpace'.
Upvotes: 0
Reputation: 361252
You need conditional variables.
A typical usage of conditional variable is this:
//lock the mutex first!
scoped_lock myLock(myMutex);
//wait till a condition is met
myConditionalVariable.wait(myLock, CheckCondition);
//Execute this code only if the condition is met
whereCheckCondition
is a function (or functor) which checks the condition (as to when to wake up, for example). It is called by wait()
function internally when it spuriously wakes up and if the condition has not met yet, the wait()
function sleeps again. Before going to sleep, wait()
releases the mutex, atomically.
If your compiler supports std::conditional
introduced by C++11, then you can see this for detail:
If your compiler doesn't support it, and you work with win32 threads, then see this:
And here is a complete example.
And if you work with POSIX threads, then see this:
You can see my implementation of conditional_variable
using win32 primitives here:
Scroll down and see it's implementation first, then see the usage in the concurrent queue implementation.
Upvotes: 1