Silver
Silver

Reputation: 1095

C++ std condition variable covering a lot of share variables

I am writing a multithreaded program in C++ and in my main thread I am waiting for my other threads to put packages in different queues. Depending on the sort of package and from which thread they originate.

The queues are protected by mutexes as it should be.

But in my main I don't want to be doing:

while(true)
if(!queue1->empty)
{
     do stuff
}
if(!queue2->empty)
{
     do stuff
}
etc

So you need to use condition variables to signal the main that something has changed. Now I can only block on 1 condition variable so I need all these threads to use the same conditional variable and an accompanying mutex. Now I dont want to really use this mutex to lock all my threads. It doesn't mean that when 1 thread is writing to a queue, another cant write to a totally different queue. So I use seperate mutexes for each queue. But now how do I use this additional mutex that comes with the conditional variable.

How it's done with 2 threads and 1 queue using boost, very simular to std. http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html :

template<typename Data>
class concurrent_queue
{
    private:
    boost::condition_variable the_condition_variable;  
    public:
    void wait_for_data()
    {
        boost::mutex::scoped_lock lock(the_mutex);
        while(the_queue.empty())
        {
             the_condition_variable.wait(lock);
        }
    }
    void push(Data const& data)
    {
         boost::mutex::scoped_lock lock(the_mutex);
         bool const was_empty=the_queue.empty();
         the_queue.push(data);
         if(was_empty)
         {
             the_condition_variable.notify_one();
         }
     }
     // rest as before
 };

So how do you solve this?

Upvotes: 0

Views: 1523

Answers (3)

Wandering Logic
Wandering Logic

Reputation: 3403

Both @Carleeto and @Useless have given good answers. You have only a single consumer, so a single queue will give you the best performance. You can't get higher throughput than the single consumer working constantly, so your objective should be to minimize the locking overhead of the single consumer, not the producers. You do this by having the producer wait on a single condition variable (indicating that the queue is non-empty) with a single mutex.

Here's how you do the parametric polymorphism. Complete type safety, no casting, and only a single virtual function in the parent class:

class ParentType {
public:
  virtual void do_work(...[params]...)=0;
  virtual ~ParentType() {}
};

class ChildType1 : public ParentType {
private:
  // all my private variables and functions
public:
  virtual void do_work(...[params]...) {
    // call private functions and use private variables from ChildType1
  }
};

class ChildType2: public ParentType {
private:
  // completely different private variables and functions
public:
  virtual void do-work(...[params]...) {
    // call private functions and use private variables from ChildType2
  }
};

Upvotes: 0

Carl
Carl

Reputation: 44458

I'd say the key to your problem is here:

Now I dont want to really use this mutex to lock all my threads. It doesn't mean that when 1 thread is writing to a queue, another cant write to a totally different queue. So I use seperate mutexes for each queue.

Why? Because:

... packages come in relatively slow. And queues are empty most of the time

It seems to me that you've designed yourself into a corner because of something you thought you needed when in reality you probably didn't need it because one queue would have actually worked in the usage scenario you mention.

I'd say start off with one queue and see how far it gets you. Then, when you run into a limitation where you really do have many threads waiting on a single mutex, you will have a lot more information about the problem and will therefore be able to solve it better.

In essence, I'd say the reason you're facing this problem is premature design optimization and the way to fix that would be to back track and change the design right now.

Upvotes: 3

Useless
Useless

Reputation: 67743

Create a top-level (possibly circular) queue of all the queues that have work in them.

This queue can be protected by a single mutex, and have a condvar which only needs to be signalled when it changes from empty to non-empty.

Now, your individual queues can each have their own mutex, and they only need to touch the shared/top-level queue (and its mutex) when they change from empty to non-empty.

Some details will depend on whether you want your thread to take only the front item from each non-empty queue in turn, or consume each whole queue in sequence, but the idea is there.


Going from non-empty to non-empty (but increased size) should also be passed on to the top level queue?

That depends, as I said, on how you consume them. If, every time a queue has something in it, you do this:

  1. (you already have the top-level lock, that's how you know this queue has something in it)
  2. lock the queue
  3. swap the queue contents with a local working copy
  4. remove the queue from the top-level queue
  5. unlock the queue

then a work queue is always either non-empty, and hence in the top-level queue, or empty and hence not in the queue.

If you don't do this, and just pull the front element off each non-empty queue, then you have more states to consider.


Note that if

... packages come in relatively slow. And queues are empty most of the time

you could probably just have one queue, since there wouldn't be enough activity to cause a lot of contention. This simplifies things enormously.

Upvotes: 2

Related Questions