Evan Douglas
Evan Douglas

Reputation: 11

C++ Producer Consumer, same consumer thread grabs all tasks

I am implementing a producer consumer project in c++, and when I run the program, the same consumer grabs almost all of the work, without letting any of the other consumer threads grab any. Sometimes, other threads do get some work, but then that other thread takes control for a while. for example, TID 10 could grab almost all of the work, but then all of a sudden TID 12 would grab it, with no other consumer threads getting work in between.

Any idea why other threads wouldn't have a chance to grab work?

#include <thread>
#include <iostream>
#include <mutex>
#include <condition_variable>
#include <deque>
#include <csignal>
#include <unistd.h>

using namespace std;

int max_queue_size = 100;
int num_producers = 5;
int num_consumers = 7;
int num_operations = 40;

int operations_created = 0;
thread_local int operations_created_by_this_thread = 0;

int operations_consumed = 0;
thread_local int operations_consumed_by_this_thread = 0;

struct thread_stuff {
    int a;
    int b;
    int operand_num;
    char operand;
};
char operands[] = {'+', '-', '/', '*'};

deque<thread_stuff> q;
bool finished = false;

condition_variable cv;
mutex queue_mutex;

void producer(int n) {
    while (operations_created_by_this_thread < num_operations) {
        int oper_num = rand() % 4;
        thread_stuff equation;
        equation.a = rand();
        equation.b = rand();
        equation.operand_num = oper_num;
        equation.operand = operands[oper_num];


        while ((operations_created - operations_consumed) >= max_queue_size) {
            // don't do anything until it has space available
        }
        {
            lock_guard<mutex> lk(queue_mutex);
            q.push_back(equation);
            operations_created++;
        }
        cv.notify_all();
        operations_created_by_this_thread++;
        this_thread::__sleep_for(chrono::seconds(rand() % 2), chrono::nanoseconds(0));
    }
    {
        lock_guard<mutex> lk(queue_mutex);
        if(operations_created == num_operations * num_producers){
            finished = true;
        }
    }
    cv.notify_all();
}

void consumer() {
    while (true) {
        unique_lock<mutex> lk(queue_mutex);
        cv.wait(lk, [] { return finished || !q.empty(); });
        if(!q.empty()) {
            thread_stuff data = q.front();
            q.pop_front();
            operations_consumed++;
            operations_consumed_by_this_thread++;
            int ans = 0;
            switch (data.operand_num) {
                case 0:
                    ans = data.a + data.b;
                    break;
                case 1:
                    ans = data.a - data.b;
                    break;
                case 2:
                    ans = data.a / data.b;
                    break;
                case 3:
                    ans = data.a * data.b;
                    break;
            }
            cout << "Operation " << operations_consumed << " processed by PID " << getpid()
                 << " TID " << this_thread::get_id() << ": "
                 << data.a << " " << data.operand << " " << data.b << " = " << ans << " queue size: "
                 << (operations_created - operations_consumed) << endl;
        }
        this_thread::yield();
        if (finished) break;
    }
}

void usr1_handler(int signal) {
    cout << "Status: Produced " << operations_created << " operations and "
         << (operations_created - operations_consumed) << " operations are in the queue" << endl;
}

void usr2_handler(int signal) {
    cout << "Status: Consumed " << operations_consumed << " operations and "
         << (operations_created - operations_consumed) << " operations are in the queue" << endl;
}

int main(int argc, char *argv[]) {
    if (argc < 5) {
        cout << "Invalid number of parameters passed in" << endl;
        exit(1);
    }
    max_queue_size = atoi(argv[1]);
    num_operations = atoi(argv[2]);
    num_producers = atoi(argv[3]);
    num_consumers = atoi(argv[4]);

//    signal(SIGUSR1, usr1_handler);
//    signal(SIGUSR2, usr2_handler);
    thread producers[num_producers];
    thread consumers[num_consumers];
    for (int i = 0; i < num_producers; i++) {
        producers[i] = thread(producer, num_operations);
    }
    for (int i = 0; i < num_consumers; i++) {
        consumers[i] = thread(consumer);
    }

    for (int i = 0; i < num_producers; i++) {
        producers[i].join();
    }
    for (int i = 0; i < num_consumers; i++) {
        consumers[i].join();
    }
    cout << "finished!" << endl;
}

Upvotes: 0

Views: 596

Answers (3)

Andriy Berestovskyy
Andriy Berestovskyy

Reputation: 8534

There are few issues with your code:

Using Normal Variables for Inter-Thread Communication

Here is an example:

int operations_created = 0;
int operations_consumed = 0;

void producer(int n) {
    [...]
    while ((operations_created - operations_consumed) >= max_queue_size) { }

and later

void consumer() {
    [...]
    operations_consumed++;

This will work only on x86 architectures without optimizations, i.e. -O0. Once we try to enable optimizations, the compiler will optimize the while loop to:

void producer(int n) {
    [...]
    if ((operations_created - operations_consumed) >= max_queue_size) {
        while (true) { }
    }

So, your program simply hang here. You can check this on Compiler Explorer.

  mov eax, DWORD PTR operations_created[rip]
  sub eax, DWORD PTR operations_consumed[rip]
  cmp eax, DWORD PTR max_queue_size[rip]
  jl .L19 // here is the if before the loop
.L20:
  jmp .L20 // here is the empty loop
.L19:

Why is this happening? From the single-thread program point of view, while (condition) { operators } is exact equivalent to if (condition) while (true) { operators } if operators do not change the condition.

To fix the issue, we should use std::atomic<int> instead of simple int. Those are designed for inter-thread communication and so compiler will avoid such optimizations and generate the correct assembly.

Consumer Locks The Mutex while yield()

Have a look at this snippet:

void consumer() {
    while (true) {
        unique_lock<mutex> lk(queue_mutex);
        [...]
        this_thread::yield();
        [...]
    }

Basically this mean that consumer does the yield() holding the lock. Since only one consumer can hold a lock at a time (mutex stands for mutual exclusion), that explains why other consumers cannot consume the work.

To fix this issue, we should unlock the queue_mutex before the yield(), i.e.:

void consumer() {
    while (true) {
        {
            unique_lock<mutex> lk(queue_mutex);
            [...]
        }
        this_thread::yield();
        [...]
    }

This still does not guarantee that only one thread will do most of the tasks. When we do notify_all() in producer, all threads get woke up, but only one will lock the mutex. Since the work we schedule is tiny, by the time producer calls notify_all() our thread will finish the work, done the yield() and will be ready for the next work.

So why this thread locks the mutex, but not the other one then? I guess that is happening due to CPU cache and busy waiting. The thread just finished the work is "hot", it is in CPU cache and ready to lock the mutex. Before go to sleep it also might try to busy wait for mutex few cycles, which increases its chances to win even more.

To fix this, we can either remove the sleep in producer (so it will wake up other threads more often, so other threads will be "hot" as well), or do a sleep() in the consumer instead of yield() (so this thread becomes "cold" during the sleep).

Anyway, there is no opportunity to do the work in parallel due to mutex, so the fact that same thread does most of the work is completely natural IMO.

Upvotes: 0

2785528
2785528

Reputation: 5566

Any idea why other threads wouldn't have a chance to grab work?

This poll is troubling:

while ((operations_created - operations_consumed) >= max_queue_size) 
{
   // don't do anything until it has space available
}

You might try a minimal delay in the loop ... this is a 'bad neighbor', and can 'consume' a core.

Upvotes: 1

Humphrey Winnebago
Humphrey Winnebago

Reputation: 1682

You're holding the mutex the whole time--including yield()-ing while holding the mutex.

Scope the unique_lock like you do in your producer's code, popping from the queue and incrementing the counter atomically.

I see that you have a max queue size. You need a 2nd condition for the producer to wait on if the queue is full, and the consumer will signal this condition as it consumes items.

Upvotes: 1

Related Questions