kakyo
kakyo

Reputation: 11600

C++ deque consumer always gets empty queue from producer

In a very simple consumer/producer test, my consumer always gets an empty queue. And I can't figure out where I'm wrong. It's a very simple test program so I hope someone with a sharp eye could give me some help here.

#include <string>
#include <cstdio>
#include <cstdlib>     /* srand, rand */
#include <ctime>
#include <deque>
#include <mutex>
#include <thread>

std::deque<std::string> gMsgs;
std::mutex gMutex;
std::thread gThread;

void updateLog() {
    std::lock_guard<std::mutex> lock(gMutex);
    char msg[256];
    int rnd = rand()%100 + 1;
    sprintf(msg, "hello: %d", rnd);
    gMsgs.push_back(std::string(msg));
}

void produce() {
    srand (time(NULL));

    // produce every 5ms

    timespec ts = {0, 5*1000000};

    //
    gThread = std::thread([&]() {
        while(true) {
            updateLog();
            nanosleep(&ts, NULL);
        }
    });
    printf("log thread created.");
}

void consume() {

    // consume every 10ms

    timespec ts = {0, 10*1000000};

    //

    while (true) {
        std::lock_guard<std::mutex> lock(gMutex);
        std::string log;
        unsigned int N = gMsgs.size();

        // consume all data in queue at the moment

        for (unsigned int m = 0; m < N; ++m) {
            log += gMsgs[m]+"\n";
        }

        // remove already consumed data

        for(unsigned int m=0; m<N; ++m) {
            gMsgs.pop_front();
        }

        if (log.empty()) {
            log = "EMPTY";
        }

        printf("log: %s\n", log.c_str());
        nanosleep(&ts, NULL);
    }
}

int main()
{
  produce();
  consume();
}


So I have my producer run in a background thread, and keep pushing new string to the queue, at a faster pace. my consumer in main thread keeps taking data out of the queue, at a slower rate.

Expectation

I expect that the queue shouldn't be empty and the consumer should always have something to eat at any moment.

Observed

The print out is always "EMPTY", meaning there is nothing in queue.

What's wrong?

UPDATE

I took the advice from @john and moved nanosleep out of lock guard, but the result is pretty much the same, see the updated code.

// Example program

#include <string>
#include <cstdio>
#include <cstdlib>     /* srand, rand */
#include <ctime>
#include <deque>
#include <mutex>
#include <thread>

std::deque<std::string> gMsgs;
std::mutex gMutex;
std::thread gThread;

void updateLog() {
    std::lock_guard<std::mutex> lock(gMutex);
    char msg[256];
    int rnd = rand()%100 + 1;
    sprintf(msg, "hello: %d", rnd);
    gMsgs.push_back(std::string(msg));
}

void produce() {
    srand (time(NULL));

    int rnd = -1;
    timespec ts = {0, 5*1000000};
    gThread = std::thread([&]() {
        while(true) {
            updateLog();
            nanosleep(&ts, NULL);
        }
    });
    printf("log thread created.\n");
}

void do_consume() {
    std::lock_guard<std::mutex> lock(gMutex);
    std::string log;
    unsigned int N = gMsgs.size();
    for (unsigned int m = 0; m < N; ++m) {
        log += gMsgs[m]+"\n";
    }
    for(unsigned int m=0; m<N; ++m) {
        gMsgs.pop_front();
    }

    if (log.empty()) {
        log = "EMPTY";
    }
    printf("log: %s\n", log.c_str());
}

void consume() {
    timespec ts = {0, 10*1000000};
    while (true) {
        do_consume();
        nanosleep(&ts, NULL);
    }
}

int main()
{
  produce();
  consume();
}

and the result

log thread created.
log: EMPTY
log: hello: 2

log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY

Upvotes: 0

Views: 212

Answers (1)

kakyo
kakyo

Reputation: 11600

Just wanna echo what @user4581301 and @PaulMcKenzie suggested, indeed std::contional_variable is the way to go. And I removed the nanosleep() calls.

The updated code:

// Example program
#include <condition_variable>
#include <string>
#include <cstdio>
#include <cstdlib>     /* srand, rand */
#include <ctime>
#include <deque>
#include <mutex>
#include <thread>

std::deque<std::string> gMsgs;
std::mutex gMutex;
std::condition_variable gNotify;
bool gConsumerCanConsume = false;
std::thread gThread;

void updateLog() {
    std::unique_lock<std::mutex> lock(gMutex);
    char msg[256];
    int rnd = rand()%100 + 1;
    sprintf(msg, "hello: %d", rnd);
    gMsgs.push_back(std::string(msg));
    gConsumerCanConsume = true;
    lock.unlock();
    gNotify.notify_one();
}

void produce() {
    srand (time(NULL));

    timespec ts = {0, 5*1000000};
    gThread = std::thread([&]() {
        while(true) {
            updateLog();
        }
    });
    printf("log thread created.\n");
}

void do_consume() {
    {
        std::unique_lock<std::mutex> lock(gMutex);
        gNotify.wait(lock, []{return gConsumerCanConsume;});
    }

    std::string log;
    unsigned int N = gMsgs.size();
    for (unsigned int m = 0; m < N; ++m) {
        log += gMsgs[m]+"\n";
    }
    for(unsigned int m=0; m<N; ++m) {
        gMsgs.pop_front();
    }
    if (log.empty()) {
        log = "EMPTY";
    }
    printf("log: %s\n", log.c_str());
}

void consume() {
    timespec ts = {0, 10*1000000};
    while (true) {
        do_consume();
    }
}

int main()
{
  produce();
  consume();
}

and the result


log thread created.
log: hello: 56
hello: 30
hello: 58
hello: 16
hello: 73
hello: 33
hello: 92
hello: 20
hello: 80
hello: 34
hello: 80
hello: 54
hello: 68
hello: 20
hello: 86
hello: 67

...

Upvotes: 0

Related Questions