pinfante
pinfante

Reputation: 31

C++11 How to handle 2 thread safe queues using condition variables in 1 thread

I have 2 connection objects running on there own threads that each put different data into there respective queue that is running in the main thread. So the main thread has 2 queues and needs to be awaken when either one of these queues signals it has put in data. I have written a thread-safe queue that encapsulates pushing,popping and signaling the condition variable inside the theadsafe_queue. But it seems as it won't work because in the main loop it can block inside the first queue and data can come to the second queue and not be waked up and vice versa.
Do I have to shared the same condition variable and mutex between the 2 queues. I could modify my threadsafe_queue to take the condition variable and mutex as parameters and pass the same ones to each queue. Or I am thinking maybe using wait_until with a timer for each queue to give a chance to check each queue once there is a timeout, but this doesn't seem efficient. The main processing thread has alot legacy code with static objects/variables and containers so it can't be split into 2 threads without introducing alot of locks. What do you think is the best way.

Upvotes: 0

Views: 424

Answers (1)

Yakk - Adam Nevraumont
Yakk - Adam Nevraumont

Reputation: 275385

Merge the queues.

Or, write a streaming system. The producers don't need to know where their data goes; it jist has to go. They need a:

template<class T>
using sink=std::function<void(T)>;

to send their data.

The listener doesn't need to know where the data is coming from. It needs a source:

template<class T>
using source= sink<sink<T>>;

now they are on different threads; so you need a way to get data from A to B.

template<class T>
struct threadsafe_queue {
  sink<T> get_sink();
  source<T> get_source();
};

in there maintain your mutex, condition variable, and buffer.

Now here ismthe fun part. If we have X=variant<A,B>, then sink<X> can convert to sink<A> (also source<A> can convert to source<X>).

So if thread 1 produces A and thread 2 produces B, they can both feed into a sink<X> without them even knowing.

Meanwhile the consumer thread sees either A or B coming from the queue.

You can replace source<T>=sink<sink<T>> with source<T>=std::function<std::optional<T>()>, where it returns empty when done. I loke sources being sinks of sinks; use is:

void print_ints( source<int> src ) {
  src([](int x){ std::cout<<x<<','; });
  std::cout<<"\n";
}

vs my less preferred:

void print_ints( source<int> src ) {
  while(auto x=src()){std::cout<<*x<<','; };
  std::cout<<"\n";
}

As an aside, you can tag source/sink types and overload | and add pipe<In,Out> etc.

But that isn't useful here.

Upvotes: 1

Related Questions