gatopeich
gatopeich

Reputation: 3811

Integrity of buffers managed by a lock-free container

(clarification after two misunderstanding answers: the code works well if the number of producer threads is less than the stack size. There is only 1 consumer releasing slots. The way I tuned this demo with 32 producers VS 16 slots is to trigger a bad condition quickly)

While stress-testing a lock-free stack used for multi-thread buffer management, I discovered that the integrity of buffers' content is not guaranteed. I am quite sure now that the stack/LIFO solution is not the best option; but I still want to understand how are these buffers being compromised.

The idea is: a lock-less stack containing pointers to buffers that are "free". They can be retrieved by one in many producer-threads. Then the buffers are filled with data and "dispatched" to a single consumer thread, which eventually returns them to the stack.

The observation is that either: - Two threads are somehow getting the same buffer. - One thread is getting a buffer whose memory is still not flushed from other thread who just released it.

Here is the simplest example I could put together for the purpose of demonstration:

UPDATE: I made a version with better debug output for anyone who wants to play with it, here: https://ideone.com/v9VAqU

#include <atomic>
#include <assert.h>
#include <chrono>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>

using namespace std;

#define N_SLOTS 16
#define N_THREADS 32

// The data buffers that are shared among threads
class Buffer { public: int data[N_THREADS] = {0}; } buffers[N_SLOTS];

// The lock-free stack under study
class LockFreeStack
{
    Buffer* stack[N_SLOTS];
    atomic_int free_slots, out_of_slots, retries;
public:
    LockFreeStack() : free_slots(0), out_of_slots(0), retries(0) {
        for (int i=0; i<N_SLOTS; i++)
            release_buffer(&buffers[i]);
    }
    Buffer* get_buffer()
    {
        int slot = --free_slots;
        if (slot < 0) {
            out_of_slots++;
            return nullptr;
        }
/// [EDIT] CAN GET PREEMPTED RIGHT HERE, BREAKING ATOMICITY!
        return stack[slot];
    }
    void release_buffer(Buffer* buf)
    {
        int slot;
        while(true) {
            slot = free_slots;
            if (slot <= 0) {
                stack[0] = buf;
                free_slots = 1;
                break;
            }
            stack[slot] = buf;
            if (free_slots++ == slot)
                break;
            retries++;
        }
    }
    ostream& toStream(ostream& oss) {
        return oss << "LockFreeStack with free_slots=" << free_slots << ", oos=" << out_of_slots << ", retries=" << retries;
    }
} lockFreeStack;

// Utility class to help with test
class PrintQueue {
    queue<Buffer*> q;
    mutex m;
public:
    void add(Buffer* buf) {
        lock_guard<mutex> lock(m);
        q.push(buf);
    }
    Buffer* pop() {
        lock_guard<mutex> lock(m);
        Buffer* buf;
        if (q.empty())
            return nullptr;
        buf = q.front();
        q.pop();
        return buf;
    }
} printQueue;

int main()
{
    vector<thread> workers;
    for (int t = 0; t < N_THREADS; ++t) {
        workers.push_back(thread([&,t] {
            while(true) {
                auto buf = lockFreeStack.get_buffer();
                if (buf) {
                    buf->data[t] = t;
                    this_thread::sleep_for(chrono::milliseconds(10));
                    printQueue.add(buf);
                }
            }
        }));
    }
    while(true) {
        this_thread::sleep_for(chrono::milliseconds(10));
        lockFreeStack.toStream(cout) << endl;
        Buffer *buf;
        while((buf = printQueue.pop())) {
            cout << "Got Buffer " << buf << " #" << (buf-buffers) << " { ";
            int used = 0;
            for(int t=0; t<N_THREADS; t++)
                if (buf->data[t]) {
                    used += 1;
                    cout << 't' << buf->data[t] << ' ';
                    buf->data[t] = 0;
                }
            cout << "}\n";
            assert (used == 1);
            lockFreeStack.release_buffer(buf);
        }
    }
    return 0;
}

And a sample of bad output:

> LockFreeStack with free_slots=-2454858, oos=2454836, retries=0
> Got Buffer 0x604a40 #12 { t7 }
> Got Buffer 0x6049c0 #11 { t8 }
> Got Buffer 0x604b40 #14 { t1 }
> Got Buffer 0x604bc0 #15 { }
> test.cpp:111: int main(): Assertion `used == 1' failed.

I have tried using std::atomic_thread_fence() all around the place but it makes no difference.

Where is the fault?

(btw, tested with several versions of GCC including 5.2 and 4.6)

Upvotes: 2

Views: 161

Answers (3)

gatopeich
gatopeich

Reputation: 3811

Thanks for the ideas. I eventually found the issue:

  1. free_slots == N
  2. get_buffer() from consumer thread #2 takes slot N-1, but gets PREEMPTED before reading the pointer in stack[N-1]
  3. release_buffer(buf) (single producer thread #1) puts new buffer on the slot that is theorically vacated, precisely stack[N-1]!

Now the pointer originally in stack[N-1] has been lost (memory leak) and the next thread to get_buffer() is going to get the same as thread #2 if it wakes around the same time.

Upvotes: 0

yb303
yb303

Reputation: 1439

Your LockFreeStack code is completely broken.

release_buffer called from 2 threads concurrently can stick 2 pointers in the same slot so one is lost.

if (free_slots++ == slot) will succeed only for one thread so the other will make another try and put its pointer in another slot. But it could also be the one winning in the first slot so you get the same but in 2 slots.

You can get the same effect with 1 thread calling release_buffer and another calling get_buffer. And one, or both, these scenarios is responsible for your corruption.

release_buffer is not bounded to the size of stack so expect buffer overruns and then all hell breaks loose.

I suggest:

  1. release_buffer first choose a unique slot atomically, then write to it.

  2. When multiple releasers compete for slots, the write order of pointers in slots is not guaranties, so you need some other means to mark a slot as valid on release_buffer, and as invalid on get_buffer. The easiest way to do it is to null it in get_buffer.

  3. Bound counters to the size of the stack. If you can not do it one atomic op, take a copy, do all the changes, then cas it back.

EDIT

Here's a scenario where the same buffer is returned into 2 cells:

                                  ////T==0  free_slots==5

// thread 1
void release_buffer(Buffer* buf)  ////T==1  buf==buffers[7]
{
    int slot;
    while(true) {                 //// 1st iteration
        slot = free_slots;        ////T==2  free_slots==5 slot==5
        if (slot <= 0) {
            stack[0] = buf;            
            free_slots = 1;            
            break;
        }                         ////*** note other threads below ***
        stack[slot] = buf;        ////     stack[5]==buffers[7]
        if (free_slots++ == slot) ////T==5 free_slots==4 slot==5 ---> go for another round
            break;
        retries++;
    }
    while(true) {                 //// 2nd iteration
        slot = free_slots;        ////T==6 free_slots==4 slot==4
        if (slot <= 0) {
            stack[0] = buf;            
            free_slots = 1;            
            break;
        }
        stack[slot] = buf;        ////     stack[4]==buffers[7] //// BOOM!!!!
        if (free_slots++ == slot) ////T==7 free_slots==5 slot==4 ---> no other round
            break;
        retries++;
    }
}

// thread 2
Buffer* get_buffer() // thread
{
    int slot = --free_slots;      ////T==3  free_slots==4
    if (slot < 0) {
        out_of_slots++;
        return nullptr;
    }
    return stack[slot];
}

// thread 3
Buffer* get_buffer()
{
    int slot = --free_slots;      ////T==4  free_slots==3
    if (slot < 0) {
        out_of_slots++;
        return nullptr;
    }
    return stack[slot];
}

EDIT 2: The assertion failure...

If you haven't found it by now, here it is:

//// producer t==0
buf->data[t] = t; //// buf->data[t] == 0

//consumer
for(int t=0; t<N_THREADS; t++)  // first iteration, t==0
  if (buf->data[t]) { //// buf->data[t] == 0, branch not taken
    used += 1;
    ...
  //// used remains ==0   -----> assert fails

Writing t+1 in the buffer will fix it.

Upvotes: 2

Sam Varshavchik
Sam Varshavchik

Reputation: 118352

I compiled and executed the following using gcc 5.3 on Linux:

#include <atomic>
#include <iostream>

int main()
{
    for (int i=0; i<5; ++i)
    {
        std::atomic_int n;

        std::cout << n << std::endl;

        n=4;
    }
    return 0;
}

The resulting output was as follows:

306406976
4
4
4
4

From that, I concluded that std::atomic_int's constructor does not explicitly clear the initial value of the atomic integer. It must be explicitly initialized. I wanted to verify this fact because I am not very familiar with the atomic library. My results suggest that std::atomic_ints must be explicitly initialized, they won't get automatically initialized to 0s.

I was prompted to verify whether or not std::atomic_ints were being initialized, or not, based on the following observations:

  • Here, the LockFreeStack constructor does not explicitly initialize the std::atomic_int class members either.

  • The constructor invokes the release_buffer() method.

  • The release_buffer() method reads and uses free_slots.

From this, I must conclude that this is undefined behavior.

Upvotes: -1

Related Questions