Reputation: 907
I need to implement a particular producer-consumer scenario in which the class Consumer creates two thread handling two ports, each one store a value into the corresponding port and, if a value is available on the other port, process
is called to consume the values on both ports. This is my attempt:
#include <thread>
#include <atomic>
#include <condition_variable>
#include <vector>
#include <iostream>
#include <mutex>
struct Port {
int value;
std::atomic<bool> free;
};
class Consumer {
private:
Port port1;
Port port2;
std::mutex mutex;
std::condition_variable port1_ready;
std::condition_variable port2_ready;
std::vector<std::thread> workers;
std::atomic<bool> done;
int count;
public:
Consumer() : done(false), count(0) {
port1.free = true;
port1.value = 0;
port2.free = true;
port1.value = 0;
workers.push_back(std::thread([this]{
while (!done) {
write1(rand());
}
}));
workers.push_back(std::thread([this]{
while (!done) {
write2(rand());
}
}));
}
void write1(int value) {
std::unique_lock lock(mutex);
port1.value = value;
port1.free = false;
std::cout << "port1 stored " << value << std::endl;
port1_ready.notify_one();
if (port2.free) {
port2_ready.wait(lock);
}
process("port1");
}
void write2(int value) {
std::unique_lock lock(mutex);
port2.value = value;
port2.free = false;
std::cout << "port2 stored " << value << std::endl;
port2_ready.notify_one();
if (port1.free) {
port1_ready.wait(lock);
}
process("port2");
}
void process(std::string string) {
port1.free = true;
port2.free = true;
std::cout << string << " consumed " << port1.value << " " << port2.value << std::endl;
if (count++ == 20) done = true;
}
~Consumer() {
for (auto& w: workers) {
w.join();
}
}
};
int main(int argc, char** argv) {
Consumer c{};
return 0;
}
Here is the output:
port1 stored 41
port2 stored 41
port2 consumed 41 41
port2 stored 18467
port1 consumed 41 18467
port1 stored 18467
port2 consumed 18467 18467
port2 stored 6334
port1 consumed 18467 6334
port1 stored 6334
port2 consumed 6334 6334
port2 stored 26500
port1 consumed 6334 26500
port1 stored 26500
port2 consumed 26500 26500
port2 stored 19169
port1 consumed 26500 19169
port1 stored 19169
port2 consumed 19169 19169
port2 stored 15724
port1 consumed 19169 15724
port1 stored 15724
port2 consumed 15724 15724
port2 stored 11478
port1 consumed 15724 11478
port1 stored 11478
port2 consumed 11478 11478
port2 stored 29358
port1 consumed 11478 29358
port1 stored 29358
port2 consumed 29358 29358
port2 stored 26962
port1 consumed 29358 26962
port1 stored 26962
port2 consumed 26962 26962
port2 stored 24464
port1 consumed 26962 24464
port1 stored 24464
port2 consumed 24464 24464
port2 stored 5705
port1 consumed 24464 5705
port1 stored 5705
port2 consumed 5705 5705
Sometimes it will successfully returns others will stuck into a deadlock.
Upvotes: 0
Views: 56
Reputation: 118300
Your logical error can be easily observed by noting the following:
void process(std::string string) {
port1.free = true;
port2.free = true;
This clearly shows that your intent is to consider the values that are now placed in both "port"s to be "processed". That is, once a value is placed into both "port"s, both values get "processed", and both ports are "free" again.
Yet, observe the very beginning of your log:
port1 stored 41
port2 stored 41
port2 consumed 41 41
So far so good, 41 was placed in both ports, and the port2
process ended up "process"ing them. But immediately afterwards:
port2 stored 18467
port1 consumed 41 18467
Things have pretty much gone off the rails at this point. 41 was already "process"ed, and obviously should not be "process"ed again.
Print the contents of your write1()
and write2()
on two sheets of paper. Use the index finger of your left hand to trace the execution of the write1()
thread, and the index finger of your right hand to trace the execution of the write2()
thread.
Start with your right hand, tracing write2()
as it locks the mutex, and does its business, and discovers that
if (port2.free) {
is true, then
port2_ready.wait(lock);
waits on this condition variable. This unblocks the mutex, and your left index finger now can move forward. Your left index finger now moves forward until:
port2_ready.notify_one();
This notifies the other thread, which has to wait until the mutex gets unlocked, so it waits while the right index finger continues moving:
if (port1.free) {
This is true, so now the first thread can get moving. And if you follow along you can see how both threads end up getting into process()
, instead of just one of them. Fail.
This logic is fundamentally broken. There are several ways to do this correctly, but the simplest one would be as follows. When either thread acquires the mutex, it
Checks if the port owned by the thread already has a value in it (from the last time it was called).
If the port already has a value, wait on the condition variable until the port is free (relying on the other thread to clear it).
If the port is free, save its value in the thread's port, then check if the other thread's port already has a value. If not, the thread can return and proceed on its business getting the next value, being assured that the other thread will take care of processing both saved values.
Otherwise both ports have values, call process()
to process them, clear both ports, and signal the other thread's condition variable, letting it know that if it was waiting for its port to be free, it's now free for saving the other thread's next value.
Upvotes: 1
Reputation: 217145
You call process
even when one of port1.free
and port2.free
is true.
You might change your code to something like:
struct Port {
std::optional<int> value;
};
class Consumer {
private:
Port port1;
Port port2;
std::mutex mutex;
std::condition_variable port1_ready;
std::condition_variable port2_ready;
std::vector<std::thread> workers;
std::atomic<bool> done;
int count;
public:
Consumer() : done(false), count(0) {
std::random_device rd;
workers.push_back(std::thread([this, gen = std::mt19937{rd()}] () mutable {
while (!done) {
write1(gen());
}
}));
workers.push_back(std::thread([this, gen = std::mt19937{rd()}] () mutable {
while (!done) {
write2(gen());
}
}));
port1_ready.notify_one();
port2_ready.notify_one();
}
void write1(int value) {
std::unique_lock lock(mutex);
port1_ready.wait(lock, [&](){ return !port1.value; });
port1.value = value;
std::cout << "port1 stored " << value << std::endl;
if (port2.value) {
process("port1");
}
}
void write2(int value) {
std::unique_lock lock(mutex);
port2_ready.wait(lock, [&](){ return !port2.value; });
port2.value = value;
std::cout << "port2 stored " << value << std::endl;
if (port1.value) {
process("port2");
}
}
void process(std::string string) {
std::cout << string << " consumed " << *port1.value << " " << *port2.value << std::endl;
port1.value.reset();
port2.value.reset();
port1_ready.notify_one();
port2_ready.notify_one();
if (count++ == 20) done = true;
}
~Consumer() {
for (auto& w: workers) {
w.join();
}
}
};
int main() {
Consumer c{};
}
Upvotes: 1