Reputation: 952
I'm trying to implement a lock-free queue that uses a linear circular buffer to store data. In contrast to a general-purpose lock-free queue I have the following relaxing conditions:
Conceptually, the queue is implemented as follows
pop()
, the write pointer is atomically incremented in push()
.pop()
. An additional "size" variable tracks the number of elements in the queue. This eliminates the need to perform arithmetic on the read and write indices. The size variable is atomically incremented after the entire write operation has taken place, i.e. the data has been written to the backing storage and the write cursor has been incremented. I'm using a compare-and-swap (CAS) operation to atomically decrement size in pop()
and only continue, if size is non-zero. This way pop()
should be guaranteed to return valid data.My queue implementation is as follows. Note the debug code that halts execution whenever pop()
attempts to read past the memory that has previously been written by push()
. This should never happen, since ‒ at least conceptually ‒ pop()
may only proceed if there are elements on the queue (there should be no underflows).
#include <atomic>
#include <cstdint>
#include <csignal> // XXX for debugging
template <typename T>
class Queue {
private:
uint32_t m_data_size; // Number of elements allocated
std::atomic<T> *m_data; // Queue data, size is power of two
uint32_t m_mask; // Bitwise AND mask for m_rd_ptr and m_wr_ptr
std::atomic<uint32_t> m_rd_ptr; // Circular buffer read pointer
std::atomic<uint32_t> m_wr_ptr; // Circular buffer write pointer
std::atomic<uint32_t> m_size; // Number of elements in the queue
static uint32_t upper_power_of_two(uint32_t v) {
v--; // https://graphics.stanford.edu/~seander/bithacks.html
v |= v >> 1; v |= v >> 2; v |= v >> 4; v |= v >> 8; v |= v >> 16;
v++;
return v;
}
public:
struct Optional { // Minimal replacement for std::optional
bool good;
T value;
Optional() : good(false) {}
Optional(T value) : good(true), value(std::move(value)) {}
explicit operator bool() const { return good; }
};
Queue(uint32_t max_size)
: // XXX Allocate 1 MiB of additional memory for debugging purposes
m_data_size(upper_power_of_two(1024 * 1024 + max_size)),
m_data(new std::atomic<T>[m_data_size]),
m_mask(m_data_size - 1),
m_rd_ptr(0),
m_wr_ptr(0),
m_size(0) {
// XXX Debug code begin
// Fill the memory with a marker so we can detect invalid reads
for (uint32_t i = 0; i < m_data_size; i++) {
m_data[i] = 0xDEADBEAF;
}
// XXX Debug code end
}
~Queue() { delete[] m_data; }
Optional pop() {
// Atomically decrement the size variable
uint32_t size = m_size.load();
while (size != 0 && !m_size.compare_exchange_weak(size, size - 1)) {
}
// The queue is empty, abort
if (size <= 0) {
return Optional();
}
// Read the actual element, atomically increase the read pointer
T res = m_data[(m_rd_ptr++) & m_mask].load();
// XXX Debug code begin
if (res == T(0xDEADBEAF)) {
std::raise(SIGTRAP);
}
// XXX Debug code end
return res;
}
void push(T t) {
m_data[(m_wr_ptr++) & m_mask].store(t);
m_size++;
}
bool empty() const { return m_size == 0; }
};
However, underflows do occur and can easily be triggered in a multi-threaded stress-test. In this particular test I maintain two queues q1
and q2
. In the main thread I feed a fixed number of elements into q1
. Two worker threads read from q1
and push onto q2
in a tight loop. The main thread reads data from q2
and feeds it back to q1
.
This works fine if there is only one worker-thread (single-producer/single-consumer) or as long as all worker-threads are on the same CPU as the main thread. However, it fails as soon as there are two worker threads that are explicitly scheduled onto a different CPU than the main thread.
The following code implements this test
#include <pthread.h>
#include <thread>
#include <vector>
static void queue_stress_test_main(std::atomic<uint32_t> &done_count,
Queue<int> &queue_rd, Queue<int> &queue_wr) {
for (size_t i = 0; i < (1UL << 24); i++) {
auto res = queue_rd.pop();
if (res) {
queue_wr.push(res.value);
}
}
done_count++;
}
static void set_thread_affinity(pthread_t thread, int cpu) {
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(cpu, &cpuset);
if (pthread_setaffinity_np(thread, sizeof(cpu_set_t),
&cpuset) != 0) {
throw "Error while calling pthread_setaffinity_np";
}
}
int main() {
static constexpr uint32_t n_threads{2U}; // Number of worker threads
//static constexpr uint32_t n_threads{1U}; // < Works fine
static constexpr uint32_t max_size{16U}; // Elements in the queue
std::atomic<uint32_t> done_count{0}; // Number of finished threads
Queue<int> queue1(max_size), queue2(max_size);
// Launch n_threads threads, make sure the main thread and the two worker
// threads are on different CPUs.
std::vector<std::thread> threads;
for (uint32_t i = 0; i < n_threads; i++) {
threads.emplace_back(queue_stress_test_main, std::ref(done_count),
std::ref(queue1), std::ref(queue2));
set_thread_affinity(threads.back().native_handle(), 0);
}
set_thread_affinity(pthread_self(), 1);
//set_thread_affinity(pthread_self(), 0); // < Works fine
// Pump data from queue2 into queue1
uint32_t elems_written = 0;
while (done_count < n_threads || !queue2.empty()) {
// Initially fill queue1 with all values from 0..max_size-1
if (elems_written < max_size) {
queue1.push(elems_written++);
}
// Read elements from queue2 and put them into queue1
auto res = queue2.pop();
if (res) {
queue1.push(res.value);
}
}
// Wait for all threads to finish
for (uint32_t i = 0; i < n_threads; i++) {
threads[i].join();
}
}
Most of the time this program triggers the trap in the queue code, which means that pop()
attempts to read memory that has never been touched by push()
‒ although pop()
should only succeed if push()
has been called at least as often as pop()
.
You can compile and run the above program with GCC/clang on Linux using
c++ -std=c++11 queue.cpp -o queue -lpthread && ./queue
Either just concatenate the above two code blocks or download the complete program here.
Note that I'm a complete novice when it comes to lock-free datastructures. I'm perfectly aware that there are plenty of battle-tested lock-free queue implementations for C++. However, I simply can't figure out why the above code does not work as intended.
Upvotes: 4
Views: 533
Reputation: 283793
You have two bugs, one of which can cause the failure you observe.
Let's look at your push code, except we'll allow only one operation per statement:
void push(T t)
{
auto const claimed_index = m_wr_ptr++; /* 1 */
auto const claimed_offset = claimed_index & m_mask; /* 2 */
auto& claimed_data = m_data[claimed_offset]; /* 3 */
claimed_data.store(t); /* 4 */
m_size++; /* 5 */
}
Now, for a queue with two producers, there is a window of vulnerability to a race condition between operations 1 and 4:
Before:
m_rd_ptr == 1
m_wr_ptr == 1
m_size == 0
Producer A:
/* 1 */ claimed_index = 1; m_wr_ptr = 2;
/* 2 */ claimed_offset = 1;
Producer B:
/* 1 */ claimed_index = 2; m_wr_ptr = 3;
/* 2 */ claimed_offset = 2;
/* 3 */ claimed_data = m_data[2];
/* 4 */ claimed_data.store(t);
/* 5 */ m_size = 1;
After:
m_size == 1
m_rd_ptr == 1
m_wr_ptr == 3
m_data[1] == 0xDEADBEAF
m_data[2] == value_produced_by_B
The consumer now runs, sees m_size > 0
, and reads from m_data[1]
while increasing m_rd_ptr
from 1 to 2. But m_data[1]
hasn't been written by Producer A yet, and Producer B wrote to m_data[2]
.
The second bug is the complementary case in pop()
when a consumer thread is interrupted between the m_rd_ptr++
action and the .load()
call. It can result in reading values out of order, potentially so far out of order that the queue has completely circled and overwritten the original value.
Just because two operations in a single source statement are atomic does not make the entire statement atomic.
Upvotes: 3