Paul
Paul

Reputation: 31

C++ Multithreading - Channels with Condition Variables

I am trying to include condition variables and channels in my multithreaded program, and have made a basic program to try and understand how they work. In this program, one thread will add numbers 0 to 9 to the channel buffer, and the other thread will display each number and pop it from the buffer.

Currently, the program runs but nothing is displayed. I suspect threads are waiting on a resource and so have entered deadlock, but I'm not sure how to fix this.

Source.cpp (threads are called):

#include "channel.h"
#include <iostream>
channel channel1;

void function1() {
    for (int i = 0; i < 10; i++) {
        channel1.write(to_string(i));
    }
}

void function2() {
    string val;
    for (int i = 0; i < 10; i++) {
        val = channel1.read();
        cout << val << "\n";
    }
}

    void main() {
        thread t1(function1);
        thread t2(function2);
        t1.join();
    t2.join();
    return;
}

channel.h (Methods for writing to/reading from buffer):

#pragma once
#include <mutex>
#include <list>
#include <string>
using namespace std;

typedef unique_lock<mutex> mutex_lock;
class channel {
public:
    list<string> buffer;
    mutex buffer_mutex; // controls access to buffer
    condition_variable cv;

    void write(string data) {
        mutex_lock lock(buffer_mutex);
        buffer.push_back(data);
        cv.notify_all();
    }

    string read() {
        string item = "";
        while (item == "") {
            mutex_lock lock(buffer_mutex);
            cv.wait(lock);
            string item = buffer.front();
            buffer.pop_front();
            return item;
        }
    }
};

Any help much appreciated :)

Upvotes: 3

Views: 6412

Answers (2)

Serid
Serid

Reputation: 361

Here is an unbuffered generic channel implementation. It was derived directly from @seccpur 's answer.

#include <iostream>
#include <thread>
#include <condition_variable>

using namespace std;

typedef unique_lock<mutex> mutex_lock;

template<class T>
class channel {
    T buffer;
    mutex buffer_mutex; // controls access to buffer
    condition_variable read_cond;
    condition_variable write_cond;
    bool data_avail = false;

public:
    void write(T data) {
        mutex_lock lock(buffer_mutex);
        write_cond.wait(lock, [&]() { return !data_avail; });
        buffer = data;
        data_avail = true;
        read_cond.notify_all();
    }

    T read() {
        mutex_lock lock(buffer_mutex);
        read_cond.wait(lock, [&]() { return data_avail; });
        T item = buffer;
        data_avail = false;
        write_cond.notify_all();
        return item;
    }
};

int main() {
    const int LENGTH = 30;

    channel<int> channel1;

    auto function1 = [&](){
        for (int i = 0; i < LENGTH; ++i) {
            channel1.write(i);
        }
    };

    auto function2 = [&](){
        for (int i = 0; i < LENGTH; ++i) {
            std::cout << 2 * channel1.read() << std::endl;
        }
    };

    thread t1(function1);
    thread t2(function2);
    t1.join();
    t2.join();
    return 0;
}

Upvotes: 0

ObliteratedJillo
ObliteratedJillo

Reputation: 5166

See this code, I introduced a bool data_avail to make the intend clear, a time delay so that the writer don't lock all the time and the while item != "" is removed since it was deemed unnecessary.

#include <iostream>
#include <thread>
#include <condition_variable>
#include <mutex>
#include <list>
#include <string>
#include <chrono>

using namespace std;

typedef unique_lock<mutex> mutex_lock;

class channel {
public:
    list<string> buffer;
    mutex buffer_mutex; // controls access to buffer
    condition_variable cv;
    bool data_avail = false;

    void write(string data) {
        mutex_lock lock(buffer_mutex);
        buffer.push_back(data);
        data_avail = true;
        cv.notify_all();
    }

    string read() {
        string item ;
        mutex_lock lock(buffer_mutex);
        cv.wait(lock,[&](){ return data_avail;});
        string item = buffer.front();
        buffer.pop_front();
        data_avail = false;
        return item;
    }
};

channel channel1;

void function1() {
    for (int i = 0; i < 10; i++) {
        channel1.write(to_string(i));
        this_thread::sleep_for(chrono::milliseconds(100));
    }
}

void function2() {
    string val;
    for (int i = 0; i < 10; i++) {
        val = channel1.read();
        cout << val << "\n";

    }
}

int main() {
    thread t1(function1);
    thread t2(function2);
    t1.join();
    t2.join();
    return 0;
}

Output:

enter image description here

Upvotes: 3

Related Questions