Reputation: 93
So I am trying to implement a double buffer for a typical producer and consumer problem.
1.get_items() basically produces 10 items at a time.
2.producer basically push 10 items onto a write queue. Assume that currently we only have one producer.
3.consumers will consume one item from the queue. There are many consumers.
So I am sharing my code as the following. The implementation idea is simple, consume from the readq until it is empty and then swap the queue pointer, which the readq would point to the writeq now and writeq would now points to the emptied queue and would starts to fill it again. So producer and consumer can work independently without halting each other. This sort of swaps space for time.
However, my code does not work in multiple consumer cases. In my code, I initiated 10 consumer threads, and it always stuck at the .join().
So I am thinking that my code is definitely buggy. However, by examine carefully, I did not find where that bug is. And it seems the code stuck after lk1.unlock(), so it is not stuck in a while or something obvious.
mutex m1;
mutex m2; // using 2 mutex, so when producer is locked, consumer can still run
condition_variable put;
condition_variable fetch;
queue<int> q1;
queue<int> q2;
queue<int>* readq = &q1;
queue<int>* writeq = &q2;
bool flag{ true };
vector<int> get_items() {
vector<int> res;
for (int i = 0; i < 10; i++) {
res.push_back(i);
}
return res;
}
void producer_mul() {
unique_lock<mutex> lk2(m2);
put.wait(lk2, [&]() {return flag == false; }); //producer waits for consumer signal
vector<int> items = get_items();
for (auto it : items) {
writeq->push(it);
}
flag = true; //signal queue is filled
fetch.notify_one();
lk2.unlock();
}
int get_one_item_mul() {
unique_lock<mutex> lk1(m1);
int res;
if (!(*readq).empty()) {
res = (*readq).front(); (*readq).pop();
if ((*writeq).empty() && flag == true) { //if writeq is empty
flag = false;
put.notify_one();
}
}
else {
readq = writeq; // swap queue pointer
while ((*readq).empty()) { // not yet write
if (flag) {
flag = false;
put.notify_one();//start filling process
}
//if (readq->empty()) { //upadted due to race. readq now points to writeq, so if producer finished, readq is not empty and flag = true.
fetch.wait(lk1, [&]() {return flag == true; });
//}
}
if (flag) {
writeq = writeq == &q1 ? &q2 : &q1; //swap the writeq to the alternative queue and fill it again
flag = false;
//put.notify_one(); //fill that queue again if needed. but in my case, 10 item is produced and consumed, so no need to use the 2nd round, plus the code does not working in this simple case..so commented out for now.
}
res = readq->front(); readq->pop();
}
lk1.unlock();
this_thread::sleep_for(10ms);
return res;
}
int main()
{
std::vector<std::thread> threads;
std::packaged_task<void(void)> job1(producer_mul);
vector<std::future<int>> res;
for (int i = 0; i < 10; i++) {
std::packaged_task<int(void)> job2(get_one_item_mul);
res.push_back(job2.get_future());
threads.push_back(std::thread(std::move(job2)));
}
threads.push_back(std::thread(std::move(job1)));
for (auto& t : threads) {
t.join();
}
for (auto& a : res) {
cout << a.get() << endl;
}
return 0;
}
I added some comments, but the idea and code is pretty simple and self-explanatory.
I am trying to figure out where the problem is in my code. Does it work for multiple consumer? Further more, if there are multiple producers here, does it work? I do not see a problem since basically in the code the lock is not fine grained. Producer and Consumer both are locked from the beginning till the end.
Looking forward to discussion and any help is appreciated.
Update
updated the race condition based on one of the answer. The program is still not working.
Upvotes: 0
Views: 1104
Reputation: 52621
Your program contains data races, and therefore exhibits undefined behavior. I see at least two:
producer_mul
accesses and modifies flag
while holding m2
mutex but not m1
. get_one_item_mul
accesses and modifies flag
while holding m1
mutex but not m2
. So flag
is not in fact protected against concurrent access.
Similarly, producer_mul
accesses writeq
pointer while holding m2
mutex but not m1
. get_one_item_mul
modifies writeq
while holding m1
mutex but not m2
.
There's also a data race on the queues themselves. Initially, both queues are empty. producer_mul
is blocked waiting on flag
. Then the following sequence occurs ( P
for producer thread, C
for consumer thread):
C: readq = writeq; // Both now point to the same queue
C: flag = false; put.notify_one(); // This wakes up producer
**P: writeq->push(it);
**C: if (readq->empty())
The last two lines happen concurrently, with no protection against concurrent access. One thread modifies an std::queue
instance while the other accesses that same instance. This is a data race.
There's a data race at the heart of the design. Let's imagine there's just one producer P
and two consumers C1
and C2
. Initially, P
waits on put
until flag == false
. C1
grabs m1
; C2
is blocked on m1
.
C1
sets readq = writeq
, then unblocks P1
, then calls fetch.wait(lk1, [&]() {return flag == true; });
. This unlocks m1
, allowing C2
to proceed. So now P
is busy writing to writeq
while C2
is busy reading from readq
- which is one and the same queue.
Upvotes: 2