Reputation: 361532
I'm implementing a concurrent_blocking_queue
with minimal functions:
//a thin wrapper over std::queue
template<typename T>
class concurrent_blocking_queue
{
std::queue<T> m_internal_queue;
//...
public:
void add(T const & item);
T& remove();
bool empty();
};
I intend to use this for producer-consumer problem (I guess, it is where one uses such data structures?). But I'm stuck on one problem which is:
How to elegantly notify consumer when producer is done? How would the producer notify the queue when it is done? By calling a specifiic member function, say done()
? Is throwing exception from the queue (i.e from remove
function) a good idea?
I came across many examples, but all has infinite loop as if the producer will produce items forever. None discussed the issue of stopping condition, not even the wiki article.
Upvotes: 5
Views: 2406
Reputation: 153939
My queues have usually used pointers (with an std::auto_ptr
in the
interface, to clearly indicate that the sender may no longer access the
pointer); for the most part, the queued objects were polymorphic, so
dynamic allocation and reference semantics were required anyway.
Otherwise, it shouldn't be too difficult to add an “end of
file” flag to the queue. You'd need a special function on the
producer side (close
?) to set it (using exactly the same locking
primitives as when you write to the queue), and the loop in the removal
function must wait for either something to be there, or the queue to be
closed. Of course, you'll need to return a Fallible
value, so that
the reader can know whether the read succeeded or not. Also, don't
forget that in this case, you need a notify_all
to ensure that all
processes waiting on the condition are awoken.
BTW: I don't quite see how your interface is implementable. What does
the T&
returned by remove
refer to. Basically, remove
has to be
something like:
Fallible<T>
MessageQueue<T>::receive()
{
ScopedLock l( myMutex );
while ( myQueue.empty() && ! myIsDone )
myCondition.wait( myMutex );
Fallible<T> results;
if ( !myQueue.empty() ) {
results.validate( myQueue.top() );
myQueue.pop();
}
return results;
}
Even without the myIsDone
condition, you have to read the value into a
local variable before removing it from the queue, and you can't return a
reference to a local variable.
For the rest:
void
MessageQueue<T>::send( T const& newValue )
{
ScopedLock l( myMutex );
myQueue.push( newValue );
myCondition.notify_all();
}
void
MessageQueue<T>::close()
{
ScopedLock l( myMutex );
myIsDone = true;
myCondition.notify_all();
}
Upvotes: 1
Reputation: 24867
'Stopping' is not often discussed because it's often never done. In those cases where it is required, it's often just as easier and more flexible to enqueue a poison-pill using the higher-level P-C protocol itself as it is to build extra functionality into the queue itself.
If you really want to do this, you could indeed set a flag that causes every consumer to raise an exception, either 'immediately' or whenever it gets back to the queue, but there are problems. Do you need the 'done' method to be synchronous, ie. do you want all the consumers gone by the time 'done' returns, or asynchronous, ie. the last consumer thread calls an event parameter when all the other the consumers are gone?
How are you going to arrange for those consumers that are currently waiting to wake up? How many are waiting and how many are busy, but will return to the queue when they have done their work? What if one or more consumers are stuck on a blocking call, (perhaps they can be unblocked, but that requires a call from another thread - how are you going to do that)?
How are the consumers going to notify that they have handled their exception and are about to die? Is 'about to die' enough, or do you need to wait on the thread handle? If you have to wait on the thread handle, what is going to do the waiting - the thread requesting the queue shutdown or the last consumer thread to notify?
Oh yes - to be safe, you should arrange for producer threads that turn up with objects to queue up while in 'shutting down' state to raise an exception as well.
I raise these questions becasue I've done all this once, a long time ago. Eventually, it all worked-ish. The objects queued up all had to have a 'QueuedItem' inserted into their inheritance chain, (so that a job-cancellation method could be exposed to the queue), and the queue had to keep a thread-safe list of objects that had been popped-off by threads but not processed yet.
After a while, I stopped using the class in favour of a simple P-C queue with no special shutdown mechanism.
Upvotes: 1
Reputation: 2612
It is true that it's common to enqueue a special "we're done" message; however I think OP's original desire for an out-of-band indicator is reasonable. Look at the complexity people are contemplating to set up an in-band completion message! Proxy types, templating; good grief. I'd say a done()
method is simpler and easier, and it makes the common case (we're not done yet) faster and cleaner.
I would agree with kids_fox that a try_remove that returns an error code if the queue is done is preferred, but that's stylistic and YMMV.
Edit: Bonus points for implementing a queue that keeps track of how many producers are remaining in a multiple-producers situation and raises the done exception iff all producers have thrown in the towel ;-) Not going to do that with in-band messages!
Upvotes: 2
Reputation: 2026
I've simply introduced a dummy "done" product in the past. So if the producer can create "products" of, say, type A and type B, I've invented type "done". When a consumer encounters a product of type "done" it knows that further processing isn't required anymore.
Upvotes: 5