Andy
Andy

Reputation: 3789

Do I have a deadlock or race condition in my producer consumer code?

I am trying to implement a producer <-> consumer pattern in C++. When I read about this pattern they always seems to mention a potential deadlock that has to be avoided. However I have implemented this below without using any mutex below. What is wrong with my code?

#include <vector>
#include <iostream>
#include <chrono>
#include <thread>
#include <atomic>

class CircularBuffer
{
public:
    CircularBuffer();
    int*          getWritePos();
    void      finishedWriting();
    int*           getReadPos();
    void      finishedReading();
private:
    void waitForReaderToCatchUp();
    void waitForWriterToCatchUp();

    const int size = 5;
    std::vector<int> data;
    // Changed from int since these variables are shared between the two threads and assignment is not necessarily atomic: 
    std::atomic<int> writePos = 0;
    std::atomic<int> readPos = 0;
};

CircularBuffer::CircularBuffer() {
    data.resize(size);
}

void
CircularBuffer::waitForReaderToCatchUp() {
    int unread = writePos - readPos;
    while (unread >= size) {
        std::this_thread::sleep_for(std::chrono::nanoseconds(10));
        unread = writePos - readPos;
    }
}

int*
CircularBuffer::getWritePos() {
    waitForReaderToCatchUp();
    int pos = writePos % size;
    return &data[pos];
}

void
CircularBuffer::finishedWriting() {
    writePos++;
}

void
CircularBuffer::waitForWriterToCatchUp() {
    int unread = writePos - readPos;
    while (unread < 1) {
        std::this_thread::sleep_for(std::chrono::nanoseconds(10));
        unread = writePos - readPos;
    }
}

int*
CircularBuffer::getReadPos() {
    waitForWriterToCatchUp();
    int pos = readPos % size;
    return &data[pos];
}

void
CircularBuffer::finishedReading() {
    readPos++;
}

const int produceMinTime = 100;

void produce(CircularBuffer *cb) {
    for (int i = 0; i < 15; i++) {
        int r = rand() % 1000;
        std::this_thread::sleep_for(std::chrono::milliseconds(produceMinTime + r));
        int *p = cb->getWritePos();
        memcpy(p, &i, 4);
        cb->finishedWriting();
    }
}

void consume(CircularBuffer *cb) {
    for (int i = 0; i < 15; i++) {
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
        int *p = cb->getReadPos();
        int j = *p;
        std::cout << "Value: " << j << std::endl;
        cb->finishedReading();
    }
}

int main()
{
    CircularBuffer cb;
    std::thread t1(produce, &cb);
    std::this_thread::sleep_for(std::chrono::milliseconds(2000));
    std::thread t2(consume, &cb);

    t1.join();
    t2.join();
    int k;
    std::cin >> k;
}

Upvotes: 0

Views: 132

Answers (1)

Lou Franco
Lou Franco

Reputation: 89142

std::vector<int> is not a thread-safe data structure. So, if you access it from two threads simultaneously, that would be considered undefined behavior. You could crash, have other problems, or it could seemingly work (but still be wrong).

The ints inside the vector, and the ones representing your positions are also not thread-safe -- read/write isn't necessarily atomic (there are lock-free ways to do that).

So, you could totally implement something like this lock-free, but not this way. Some info here: https://www.infoq.com/news/2014/10/cpp-lock-free-programming/

Generally, you want to look at the primitives in std::atomic: https://en.cppreference.com/w/cpp/atomic/atomic

Also see: Ring buffer with atomic indexes

Upvotes: 1

Related Questions