Reputation: 11600
In a very simple consumer/producer test, my consumer always gets an empty queue. And I can't figure out where I'm wrong. It's a very simple test program so I hope someone with a sharp eye could give me some help here.
#include <string>
#include <cstdio>
#include <cstdlib> /* srand, rand */
#include <ctime>
#include <deque>
#include <mutex>
#include <thread>
std::deque<std::string> gMsgs;
std::mutex gMutex;
std::thread gThread;
void updateLog() {
std::lock_guard<std::mutex> lock(gMutex);
char msg[256];
int rnd = rand()%100 + 1;
sprintf(msg, "hello: %d", rnd);
gMsgs.push_back(std::string(msg));
}
void produce() {
srand (time(NULL));
// produce every 5ms
timespec ts = {0, 5*1000000};
//
gThread = std::thread([&]() {
while(true) {
updateLog();
nanosleep(&ts, NULL);
}
});
printf("log thread created.");
}
void consume() {
// consume every 10ms
timespec ts = {0, 10*1000000};
//
while (true) {
std::lock_guard<std::mutex> lock(gMutex);
std::string log;
unsigned int N = gMsgs.size();
// consume all data in queue at the moment
for (unsigned int m = 0; m < N; ++m) {
log += gMsgs[m]+"\n";
}
// remove already consumed data
for(unsigned int m=0; m<N; ++m) {
gMsgs.pop_front();
}
if (log.empty()) {
log = "EMPTY";
}
printf("log: %s\n", log.c_str());
nanosleep(&ts, NULL);
}
}
int main()
{
produce();
consume();
}
So I have my producer run in a background thread, and keep pushing new string to the queue, at a faster pace. my consumer in main thread keeps taking data out of the queue, at a slower rate.
I expect that the queue shouldn't be empty and the consumer should always have something to eat at any moment.
The print out is always "EMPTY", meaning there is nothing in queue.
What's wrong?
I took the advice from @john and moved nanosleep
out of lock guard, but the result is pretty much the same, see the updated code.
// Example program
#include <string>
#include <cstdio>
#include <cstdlib> /* srand, rand */
#include <ctime>
#include <deque>
#include <mutex>
#include <thread>
std::deque<std::string> gMsgs;
std::mutex gMutex;
std::thread gThread;
void updateLog() {
std::lock_guard<std::mutex> lock(gMutex);
char msg[256];
int rnd = rand()%100 + 1;
sprintf(msg, "hello: %d", rnd);
gMsgs.push_back(std::string(msg));
}
void produce() {
srand (time(NULL));
int rnd = -1;
timespec ts = {0, 5*1000000};
gThread = std::thread([&]() {
while(true) {
updateLog();
nanosleep(&ts, NULL);
}
});
printf("log thread created.\n");
}
void do_consume() {
std::lock_guard<std::mutex> lock(gMutex);
std::string log;
unsigned int N = gMsgs.size();
for (unsigned int m = 0; m < N; ++m) {
log += gMsgs[m]+"\n";
}
for(unsigned int m=0; m<N; ++m) {
gMsgs.pop_front();
}
if (log.empty()) {
log = "EMPTY";
}
printf("log: %s\n", log.c_str());
}
void consume() {
timespec ts = {0, 10*1000000};
while (true) {
do_consume();
nanosleep(&ts, NULL);
}
}
int main()
{
produce();
consume();
}
and the result
log thread created.
log: EMPTY
log: hello: 2
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
Upvotes: 0
Views: 212
Reputation: 11600
Just wanna echo what @user4581301 and @PaulMcKenzie suggested, indeed std::contional_variable
is the way to go. And I removed the nanosleep()
calls.
The updated code:
// Example program
#include <condition_variable>
#include <string>
#include <cstdio>
#include <cstdlib> /* srand, rand */
#include <ctime>
#include <deque>
#include <mutex>
#include <thread>
std::deque<std::string> gMsgs;
std::mutex gMutex;
std::condition_variable gNotify;
bool gConsumerCanConsume = false;
std::thread gThread;
void updateLog() {
std::unique_lock<std::mutex> lock(gMutex);
char msg[256];
int rnd = rand()%100 + 1;
sprintf(msg, "hello: %d", rnd);
gMsgs.push_back(std::string(msg));
gConsumerCanConsume = true;
lock.unlock();
gNotify.notify_one();
}
void produce() {
srand (time(NULL));
timespec ts = {0, 5*1000000};
gThread = std::thread([&]() {
while(true) {
updateLog();
}
});
printf("log thread created.\n");
}
void do_consume() {
{
std::unique_lock<std::mutex> lock(gMutex);
gNotify.wait(lock, []{return gConsumerCanConsume;});
}
std::string log;
unsigned int N = gMsgs.size();
for (unsigned int m = 0; m < N; ++m) {
log += gMsgs[m]+"\n";
}
for(unsigned int m=0; m<N; ++m) {
gMsgs.pop_front();
}
if (log.empty()) {
log = "EMPTY";
}
printf("log: %s\n", log.c_str());
}
void consume() {
timespec ts = {0, 10*1000000};
while (true) {
do_consume();
}
}
int main()
{
produce();
consume();
}
and the result
log thread created.
log: hello: 56
hello: 30
hello: 58
hello: 16
hello: 73
hello: 33
hello: 92
hello: 20
hello: 80
hello: 34
hello: 80
hello: 54
hello: 68
hello: 20
hello: 86
hello: 67
...
Upvotes: 0