Reputation: 2419
I'm writing an application which has an event queue. My intention is to create this in such a way that multiple threads can write and one thread can read from the queue, and hand over the processing of a popped element to another thread so that the subsequent pop again will not be blocked. I used a lock and a condition variable for pushing and popping items from the queue:
void Publisher::popEvent(boost::shared_ptr<Event>& event) {
boost::mutex::scoped_lock lock(queueMutex);
while(eventQueue.empty())
{
queueConditionVariable.wait(lock);
}
event = eventQueue.front();
eventQueue.pop();
lock.unlock();
}
void Publisher::pushEvent(boost::shared_ptr<Event> event) {
boost::mutex::scoped_lock lock(queueMutex);
eventQueue.push(event);
lock.unlock();
queueConditionVariable.notify_one();
}
In the constructor of the Publisher class (only one instance is created), I'm starting one thread which will iterate through a loop till a notify_one() is captured, and then is starting up another thread to process the event popped from the queue:
In constructor:
publishthreadGroup = boost::shared_ptr<boost::thread_group> (new boost::thread_group());
publishthreadGroup->create_thread(boost::bind(queueProcessor, this));
queueProcessor method:
void queueProcessor(Publisher* agent) {
while(true) {
boost::shared_ptr<Event> event;
agent->getEvent(event);
agent->publishthreadGroup->create_thread(boost::bind(dispatcher, agent, event));
}
}
and in the dispatcher method, the relevant processing is done and the processed information is published to a server via thrift. In another method called before program exists, which is in the main thread, I call join_all() so that main thread waits till threads are done.
In this implementation, after the thread for dispatcher is made, in the while loop above, I have experienced a deadlock/hang. The running code seem to be stuck. What is the issue in this implementation? And is there a much cleaner, better way of doing what I am trying to do? (Multiple producers and one one consumer thread iterating through the queue and handing off the processing of an element to a different thread)
Thank you!
Upvotes: 2
Views: 603
Reputation: 2987
It seems that the queueProcessor
function will run forever and the thread running it will never exit. Any threads created by that function will do their work and exit, but this thread - the first one created in the publishthreadGroup
- has a while(true)
loop that has no way of stopping. Thus a call to join_all()
will wait forever. Can you create some other flag variable that triggers that function to exit the loop and return? That should do the trick!
Upvotes: 1