Reputation: 31
I am trying to include condition variables and channels in my multithreaded program, and have made a basic program to try and understand how they work. In this program, one thread will add numbers 0 to 9 to the channel buffer, and the other thread will display each number and pop it from the buffer.
Currently, the program runs but nothing is displayed. I suspect threads are waiting on a resource and so have entered deadlock, but I'm not sure how to fix this.
Source.cpp (threads are called):
#include "channel.h"
#include <iostream>
channel channel1;
void function1() {
for (int i = 0; i < 10; i++) {
channel1.write(to_string(i));
}
}
void function2() {
string val;
for (int i = 0; i < 10; i++) {
val = channel1.read();
cout << val << "\n";
}
}
void main() {
thread t1(function1);
thread t2(function2);
t1.join();
t2.join();
return;
}
channel.h (Methods for writing to/reading from buffer):
#pragma once
#include <mutex>
#include <list>
#include <string>
using namespace std;
typedef unique_lock<mutex> mutex_lock;
class channel {
public:
list<string> buffer;
mutex buffer_mutex; // controls access to buffer
condition_variable cv;
void write(string data) {
mutex_lock lock(buffer_mutex);
buffer.push_back(data);
cv.notify_all();
}
string read() {
string item = "";
while (item == "") {
mutex_lock lock(buffer_mutex);
cv.wait(lock);
string item = buffer.front();
buffer.pop_front();
return item;
}
}
};
Any help much appreciated :)
Upvotes: 3
Views: 6412
Reputation: 361
Here is an unbuffered generic channel implementation. It was derived directly from @seccpur 's answer.
#include <iostream>
#include <thread>
#include <condition_variable>
using namespace std;
typedef unique_lock<mutex> mutex_lock;
template<class T>
class channel {
T buffer;
mutex buffer_mutex; // controls access to buffer
condition_variable read_cond;
condition_variable write_cond;
bool data_avail = false;
public:
void write(T data) {
mutex_lock lock(buffer_mutex);
write_cond.wait(lock, [&]() { return !data_avail; });
buffer = data;
data_avail = true;
read_cond.notify_all();
}
T read() {
mutex_lock lock(buffer_mutex);
read_cond.wait(lock, [&]() { return data_avail; });
T item = buffer;
data_avail = false;
write_cond.notify_all();
return item;
}
};
int main() {
const int LENGTH = 30;
channel<int> channel1;
auto function1 = [&](){
for (int i = 0; i < LENGTH; ++i) {
channel1.write(i);
}
};
auto function2 = [&](){
for (int i = 0; i < LENGTH; ++i) {
std::cout << 2 * channel1.read() << std::endl;
}
};
thread t1(function1);
thread t2(function2);
t1.join();
t2.join();
return 0;
}
Upvotes: 0
Reputation: 5166
See this code, I introduced a bool data_avail
to make the intend clear, a time delay so that the writer don't lock all the time and the while item != ""
is removed since it was deemed unnecessary.
#include <iostream>
#include <thread>
#include <condition_variable>
#include <mutex>
#include <list>
#include <string>
#include <chrono>
using namespace std;
typedef unique_lock<mutex> mutex_lock;
class channel {
public:
list<string> buffer;
mutex buffer_mutex; // controls access to buffer
condition_variable cv;
bool data_avail = false;
void write(string data) {
mutex_lock lock(buffer_mutex);
buffer.push_back(data);
data_avail = true;
cv.notify_all();
}
string read() {
string item ;
mutex_lock lock(buffer_mutex);
cv.wait(lock,[&](){ return data_avail;});
string item = buffer.front();
buffer.pop_front();
data_avail = false;
return item;
}
};
channel channel1;
void function1() {
for (int i = 0; i < 10; i++) {
channel1.write(to_string(i));
this_thread::sleep_for(chrono::milliseconds(100));
}
}
void function2() {
string val;
for (int i = 0; i < 10; i++) {
val = channel1.read();
cout << val << "\n";
}
}
int main() {
thread t1(function1);
thread t2(function2);
t1.join();
t2.join();
return 0;
}
Output:
Upvotes: 3