Reputation: 3869
I have two threads running one is a producer which produces the data and put it in a queue and the other is a consumer which consumes that data. I would like the producer to produce the data with a delay of a few seconds but do not want the consumer to be waiting it should process the data asynchronously as soon as it is available without a delay. Here is the code.
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <string>
std::mutex mutex;
std::condition_variable cond;
std::queue<int> buffer;
void producer(int val)
{
while(val) {
std::unique_lock locker(mutex);
cond.wait(locker, []() {
return buffer.size() < 50;
});
// add pulse
std::this_thread::sleep_for(std::chrono::seconds(1));
buffer.push(val);
std::cout << "Produced " << val << std::endl;
val --;
locker.unlock();
cond.notify_one();
}
}
void consumer()
{
while(true) {
std::unique_lock locker(mutex);
cond.wait(locker, [](){
return buffer.size() > 0;
});
int val = buffer.front();
std::cout << "Consumer " << val << std::endl;
buffer.pop();
locker.unlock();
cond.notify_one();
}
}
int main()
{
std::thread t1(producer, 50);
std::thread t2(consumer);
t1.join();
t2.join();
return 0;
}
The Result
NOT Running Parallel with std::this_thread::sleep_for(std::chrono::seconds(3));
Produced 10
Produced 9
Produced 8
Produced 7
Produced 6
Produced 5
Produced 4
Produced 3
Produced 2
Produced 1
Consumer 10
Consumer 9
Consumer 8
Consumer 7
Consumer 6
Consumer 5
Consumer 4
Consumer 3
Consumer 2
Without Wait
Running Parallel
Produced 40
Produced 39
Produced 38
Produced 37
Produced 36
Produced 35
Produced 34
Produced 33
Produced 32
Produced 31
Produced 30
Produced 29
Produced 28
Produced 27
Produced 26
Produced 25
Produced 24
Produced 23
Produced 22
Produced 21
Produced 20
Produced 19
Produced 18
Produced 17
Produced 16
Produced 15
Produced 14
Consumer 40
Consumer 39
Consumer 38
Consumer 37
Consumer 36
Consumer 35
Consumer 34
Consumer 33
Consumer 32
Consumer 31
Consumer 30
Consumer 29
Consumer 28
Consumer 27
Consumer 26
Consumer 25
Consumer 24
Consumer 23
Consumer 22
Consumer 21
Consumer 20
Consumer 19
Consumer 18
Consumer 17
Consumer 16
Consumer 15
Produced 13
Produced 12
Produced 11
Produced 10
Produced 9
Produced 8
Produced 7
Produced 6
Produced 5
Produced 4
Produced 3
Produced 2
Produced 1
Consumer 14
Consumer 13
Consumer 12
Consumer 11
Consumer 10
Consumer 9
Consumer 8
Consumer 7
Consumer 6
Consumer 5
Consumer 4
Consumer 3
Consumer 2
The Problem
It works fine when I do not make the producer wait but when I add delay using std::this_thread::sleep_for(std::chrono::seconds(3));
The problem is the consumer won't start processing the data until the producer has produced all of the data. The question is how can I not make the consumer wait? Or how can I add an async wait in the producer?
Upvotes: 3
Views: 640
Reputation: 118340
std::this_thread::sleep_for
gets called while the mutex is still held. This prevents the other execution thread from doing anything, it's as simple as that. The mutex is still locked. When a thread has locked a mutex and then is waiting on a condition variable, the execution thread will not resume, even if the condition variable is signaled, until it can re-lock the mutex (after being notified). And it's quite difficult to do so if the other execution thread keeps it locked.
It is true that the producer later unlocks the mutex and then signals the condition variable. However the very next thing that happens is that it re-locks the mutex at the next iteration of the loop. This raises the next issue: even if a thread is waiting on a condition variable, when signaled it is not guaranteed that it will be able to re-lock upon the mutex upon receiving the signal. The only thing that you're guaranteed is that at some point after the condition variable get signaled the waiting thread will wake up and do so, but this is not an indivisible operation.
Waiting on a condition variable and unlocking its mutex is an indivisible operation, but the opposite is not true. It is not guaranteed that signalling a condition variable, with another execution thread receiving the signal and re-locking a mutex is an indivisible operation.
What's happening here is that the producer simply re-locks the mutex fast enough to prevent the consumer from re-locking the mutex and returning from std::condition_variable::wait
.
The simplest solution is to sleep_for
after unlocking the mutex.
Additionally, with classical mutex semantics the condition variable should be signaled while the mutex is still locked, and not after unlocking it. This doesn't make much of a difference here but there are some edge cases where it does matter, and it is simply easier to always use the same signaling/locking order instead of attempting to figure out if in each particular case it does not matter.
#include <queue>
#include <string>
std::mutex mutex;
std::condition_variable cond;
std::queue<int> buffer;
void producer(int val)
{
while(val) {
std::unique_lock locker(mutex);
cond.wait(locker, []() {
return buffer.size() < 50;
});
buffer.push(val);
std::cout << "Produced " << val << std::endl;
val --;
cond.notify_one();
locker.unlock();
// add pulse
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
void consumer()
{
while(true) {
std::unique_lock locker(mutex);
cond.wait(locker, [](){
return buffer.size() > 0;
});
int val = buffer.front();
std::cout << "Consumer " << val << std::endl;
buffer.pop();
cond.notify_one();
locker.unlock();
}
}
int main()
{
std::thread t1(producer, 50);
std::thread t2(consumer);
t1.join();
t2.join();
return 0;
}
I should also note that the producer's check for the buffer's size to be smaller than 50 is now a moot point. The consumer will have no problems keeping up with the producer, any more.
Upvotes: 1