kingmeng
kingmeng

Reputation: 73

C++ Lock-Free Queue

I designed this function, which is used to implement Lock-Free queues, but it has deadlock problems during the actual execution process(dequeue). I checked it many times and I thought it was fine. I run on the x86 platform and have 12 threads to read and write.

Now I want to figure out what caused the situation, and I want to know if this is a thread-safe design or where it needs to continue to be optimized for higher performance.

12 threads to dequeue and 12 threads to enqueue.
Development tools : Visual studio 2019
I am very much looking forward to your reply. thank u.


#include <iostream>
#include <functional>

#include<atomic>
#include<cassert>
#include<thread>
#include<vector>
template<typename T>
class mpmc_queue_t
{
public:
    mpmc_queue_t(size_t size) :
        _size(size),
        _mask(_size - 1),
        _buffer((node_t*)(new aligned_node_t[_size]))
    {
        assert((_size != 0) && ((_size & (~_size + 1)) == _size));
        _read.store(0, std::memory_order_relaxed);
        _write.store(0, std::memory_order_relaxed);
        for (size_t i = 0; i < _size; ++i)
        {
            _buffer[i].status.store(false, std::memory_order_relaxed);
        }
    }
    ~mpmc_queue_t()
    {
        delete[] _buffer;
    }
    bool enqueue(const T& data)
    {
        auto write = _write.fetch_add(1, std::memory_order_relaxed);
        node_t* node = &_buffer[write & _mask];
        while (true)
        {
            if (!node->status.load(std::memory_order_acquire))
            {
                node->data = data;
                node->status.store(true, std::memory_order_release);
                return true;
            }
            std::this_thread::yield();
        }
    }
    bool dequeue(T& data)
    {
        auto read = _read.fetch_add(1, std::memory_order_relaxed);
        node_t* node = &_buffer[read & _mask];
        while (true)
        {
            if (node->status.load(std::memory_order_acquire))
            {
                data = node->data;
                node->status.store(false, std::memory_order_release);
                return true;
            }
            std::this_thread::yield();
        }
    }

private:
    struct node_t
    {
        T                   data;
        std::atomic_bool    status;
    };
    typedef typename std::aligned_storage<sizeof(node_t), std::alignment_of<node_t>::value>::type aligned_node_t;
    typedef char cache_line_pad_t[64];
    cache_line_pad_t    _pad0;
    size_t              _size;
    size_t              _mask;
    node_t* const       _buffer;
    cache_line_pad_t    _pad1;
    std::atomic_size_t  _read;
    cache_line_pad_t    _pad2;
    std::atomic_size_t  _write;
    cache_line_pad_t    _pad3;
};

#define COUNT 100000000
#define THREAD 12
typedef mpmc_queue_t<size_t> queue_t;

template<typename T>
void consumer_func(T* queue)
{
    size_t count = COUNT;
    size_t value = 0;

    while (count > 0) {
        if (queue->dequeue(value)) {
            --count;
        }
    }
    std::cout << "consumer_func ID: " << std::this_thread::get_id() << " ok" << std::endl;
}

template<typename T>
void producer_func(T* queue)
{
    size_t count = COUNT;
    while (count > 0) {
        if (queue->enqueue(count)) {
            --count;
        }
    }
    std::cout << "producer_func ID: " << std::this_thread::get_id() << " ok" << std::endl;
}

template<typename T>
long double
run_test(
    T producer_func,
    T consumer_func)
{
    typedef std::chrono::high_resolution_clock clock_t;
    typedef std::chrono::time_point<clock_t> time_t;
    time_t start;
    time_t end;
    start = clock_t::now();
    std::thread producer0(producer_func);
    std::thread producer1(producer_func);
    std::thread producer2(producer_func);
    std::thread producer3(producer_func);
    std::thread producer4(producer_func);
    std::thread producer5(producer_func);
    std::thread producer6(producer_func);
    std::thread producer7(producer_func);
    std::thread producer8(producer_func);
    std::thread producer9(producer_func);
    std::thread producer10(producer_func);
    std::thread producer11(producer_func);

    std::thread consumer0(consumer_func);
    std::thread consumer1(consumer_func);
    std::thread consumer2(consumer_func);
    std::thread consumer3(consumer_func);
    std::thread consumer4(consumer_func);
    std::thread consumer5(consumer_func);
    std::thread consumer6(consumer_func);
    std::thread consumer7(consumer_func);
    std::thread consumer8(consumer_func);
    std::thread consumer9(consumer_func);
    std::thread consumer10(consumer_func);
    std::thread consumer11(consumer_func);

    producer0.join();
    producer1.join();
    producer2.join();
    producer3.join();
    producer4.join();
    producer5.join();
    producer6.join();
    producer7.join();
    producer8.join();
    producer9.join();
    producer10.join();
    producer11.join();

    consumer0.join();
    consumer1.join();
    consumer2.join();
    consumer3.join();
    consumer4.join();
    consumer5.join();
    consumer6.join();
    consumer7.join();
    consumer8.join();
    consumer9.join();
    consumer10.join();
    consumer11.join();
    end = clock_t::now();

    return
        (end - start).count()
        * ((double)std::chrono::high_resolution_clock::period::num
            / std::chrono::high_resolution_clock::period::den);
}


int main()
{

    {
        queue_t queue(65536);
        long double seconds = run_test(std::bind(&producer_func<queue_t>, &queue),
            std::bind(&consumer_func<queue_t>, &queue));

        std::cout << "The control group completed "
            << COUNT * THREAD
            << " iterations in "
            << seconds
            << " seconds. "
            << ((long double)COUNT * THREAD / seconds) / 1000000
            << " million enqueue/dequeue pairs per second."
            << std::endl;
    }

    return 0;
}

Upvotes: 1

Views: 1374

Answers (1)

mpoeter
mpoeter

Reputation: 2949

This design is not lock-free but "lock-less", because a thread in dequeue may have to wait for an enqueue operation on that item to finish (signaled via status), i.e., it does not provide the progress guarantee required by lock-freedom.

As Matt Timmermans already pointed out, there is a problem when indexes wrap around. There is not guarantee that the status of the node has already been updated, or, since the operations on status are not sequentially consistent, whether this update is visible. This can result in a data race when, after a wrap around, two threads (in different rounds) try to push to the same node, because both observed node->status.load() returning false.

To solve this, you can use a counter instead of a bool in the node to track the current round the node belongs to (similar to how it's done in this queue by Dmitry Vukov: http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue)

Upvotes: 1

Related Questions