Reputation: 3789
I am trying to implement a producer <-> consumer pattern in C++. When I read about this pattern they always seems to mention a potential deadlock that has to be avoided. However I have implemented this below without using any mutex below. What is wrong with my code?
#include <vector>
#include <iostream>
#include <chrono>
#include <thread>
#include <atomic>
class CircularBuffer
{
public:
CircularBuffer();
int* getWritePos();
void finishedWriting();
int* getReadPos();
void finishedReading();
private:
void waitForReaderToCatchUp();
void waitForWriterToCatchUp();
const int size = 5;
std::vector<int> data;
// Changed from int since these variables are shared between the two threads and assignment is not necessarily atomic:
std::atomic<int> writePos = 0;
std::atomic<int> readPos = 0;
};
CircularBuffer::CircularBuffer() {
data.resize(size);
}
void
CircularBuffer::waitForReaderToCatchUp() {
int unread = writePos - readPos;
while (unread >= size) {
std::this_thread::sleep_for(std::chrono::nanoseconds(10));
unread = writePos - readPos;
}
}
int*
CircularBuffer::getWritePos() {
waitForReaderToCatchUp();
int pos = writePos % size;
return &data[pos];
}
void
CircularBuffer::finishedWriting() {
writePos++;
}
void
CircularBuffer::waitForWriterToCatchUp() {
int unread = writePos - readPos;
while (unread < 1) {
std::this_thread::sleep_for(std::chrono::nanoseconds(10));
unread = writePos - readPos;
}
}
int*
CircularBuffer::getReadPos() {
waitForWriterToCatchUp();
int pos = readPos % size;
return &data[pos];
}
void
CircularBuffer::finishedReading() {
readPos++;
}
const int produceMinTime = 100;
void produce(CircularBuffer *cb) {
for (int i = 0; i < 15; i++) {
int r = rand() % 1000;
std::this_thread::sleep_for(std::chrono::milliseconds(produceMinTime + r));
int *p = cb->getWritePos();
memcpy(p, &i, 4);
cb->finishedWriting();
}
}
void consume(CircularBuffer *cb) {
for (int i = 0; i < 15; i++) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
int *p = cb->getReadPos();
int j = *p;
std::cout << "Value: " << j << std::endl;
cb->finishedReading();
}
}
int main()
{
CircularBuffer cb;
std::thread t1(produce, &cb);
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
std::thread t2(consume, &cb);
t1.join();
t2.join();
int k;
std::cin >> k;
}
Upvotes: 0
Views: 132
Reputation: 89142
std::vector<int>
is not a thread-safe data structure. So, if you access it from two threads simultaneously, that would be considered undefined behavior. You could crash, have other problems, or it could seemingly work (but still be wrong).
The ints inside the vector, and the ones representing your positions are also not thread-safe -- read/write isn't necessarily atomic (there are lock-free ways to do that).
So, you could totally implement something like this lock-free, but not this way. Some info here: https://www.infoq.com/news/2014/10/cpp-lock-free-programming/
Generally, you want to look at the primitives in std::atomic: https://en.cppreference.com/w/cpp/atomic/atomic
Also see: Ring buffer with atomic indexes
Upvotes: 1