mbrt
mbrt

Reputation: 2178

C++11 lockfree single producer single consumer: how to avoid busy wait

I'm trying to implement a class that uses two threads: one for the producer and one for the consumer. The current implementation does not use locks:

#include <boost/lockfree/spsc_queue.hpp>
#include <atomic>
#include <thread>

using Queue =
        boost::lockfree::spsc_queue<
            int,
            boost::lockfree::capacity<1024>>;

class Worker
{
public:
    Worker() : working_(false), done_(false) {}
    ~Worker() {
        done_ = true;    // exit even if the work has not been completed
        worker_.join();
    }

    void enqueue(int value) {
        queue_.push(value);
        if (!working_) {
            working_ = true;
            worker_ = std::thread([this]{ work(); });
        }
    }

    void work() {
        int value;
        while (!done_ && queue_.pop(value)) {
            std::cout << value << std::endl;
        }
        working_ = false;
    }

private:
    std::atomic<bool> working_;
    std::atomic<bool> done_;
    Queue queue_;
    std::thread worker_;
};

The application needs to enqueue work items for a certain amount of time and then sleep waiting for an event. This is a minimal main that simulates the behavior:

int main()
{
    Worker w;
    for (int i = 0; i < 1000; ++i)
        w.enqueue(i);
    std::this_thread::sleep_for(std::chrono::seconds(1));
    for (int i = 0; i < 1000; ++i)
        w.enqueue(i);
    std::this_thread::sleep_for(std::chrono::seconds(1));
}

I'm pretty sure that my implementation is bugged: what if the worker thread completes and before executing working_ = false, another enqueue comes? Is it possible to make my code thread safe without using locks?

The solution requires:

Edit

I did another implementation of the Worker class, based on your suggestions. Here is my second attempt:

class Worker
{
public:
    Worker()
        : working_(ATOMIC_FLAG_INIT), done_(false) { } 

    ~Worker() {
        // exit even if the work has not been completed
        done_ = true;
        if (worker_.joinable())
            worker_.join();
    }

    bool enqueue(int value) {
        bool enqueued = queue_.push(value);
        if (!working_.test_and_set()) {
            if (worker_.joinable())
                worker_.join();
            worker_ = std::thread([this]{ work(); });
        }
        return enqueued;
    }

    void work() {
        int value;
        while (!done_ && queue_.pop(value)) {
            std::cout << value << std::endl;
        }
        working_.clear();
        while (!done_ && queue_.pop(value)) {
            std::cout << value << std::endl;
        }
    }

private:
    std::atomic_flag working_;
    std::atomic<bool> done_;
    Queue queue_;
    std::thread worker_;
};

I introduced the worker_.join() inside the enqueue method. This can impact the performances, but in very rare cases (when the queue gets empty and before the thread exits, another enqueue comes). The working_ variable is now an atomic_flag that is set in enqueue and cleared in work. The Additional while after working_.clear() is needed because if another value is pushed, before the clear, but after the while, the value is not processed.

Is this implementation correct?

I did some tests and the implementation seems to work.

OT: Is it better to put this as an edit, or an answer?

Upvotes: 9

Views: 2232

Answers (4)

Pablo H
Pablo H

Reputation: 659

Very partial answer: I think all those atomics, semaphores and states are a back-communication channel, from "the thread" to "the Worker". Why not use another queue for that? At the very least, thinking about it will help you around the problem.

Upvotes: 0

mbrt
mbrt

Reputation: 2178

This is my solution of the question. I don't like very much answering myself, but I think showing actual code may help others.

#include <boost/lockfree/spsc_queue.hpp>
#include <atomic>
#include <thread>
// I used this semaphore class: https://gist.github.com/yohhoy/2156481
#include "binsem.hpp"

using Queue =
    boost::lockfree::spsc_queue<
        int,
        boost::lockfree::capacity<1024>>;

class Worker
{
public:
    // the worker thread starts in the constructor
    Worker()
        : working_(ATOMIC_FLAG_INIT), done_(false), semaphore_(0)
        , worker_([this]{ work(); })
    { } 

    ~Worker() {
        // exit even if the work has not been completed
        done_ = true;
        semaphore_.signal();
        worker_.join();
    }

    bool enqueue(int value) {
        bool enqueued = queue_.push(value);
        if (!working_.test_and_set())
            // signal to the worker thread to wake up
            semaphore_.signal();
        return enqueued;
    }

    void work() {
        int value;
        // the worker thread continue to live
        while (!done_) {
            // wait the start signal, sleeping
            semaphore_.wait();
            while (!done_ && queue_.pop(value)) {
                // perform actual work
                std::cout << value << std::endl;
            }
            working_.clear();
            while (!done_ && queue_.pop(value)) {
                // perform actual work
                std::cout << value << std::endl;
            }
        }
    }

private:
    std::atomic_flag working_;
    std::atomic<bool> done_;
    binsem semaphore_;
    Queue queue_;
    std::thread worker_;
};

I tried the suggestion of @Cameron, to not shutdown the thread and adding a semaphore. This actually is used only in the first enqueue and in the last work. This is not lock-free, but only in these two cases.

I did some performance comparison, between my previous version (see my edited question), and this one. There are no significant differences, when there are not many start and stop. However, the enqueue is 10 times faster when it have to signal the worker thread, instead of starting a new thread. This is a rare case, so it is not very important, but anyway it is an improvement.

This implementation satisfies:

  • lock-free in the common case (when enqueue and work are busy);
  • no busy wait in case for long time there are not enqueue
  • the destructor exits as soon as possible
  • correctness?? :)

Upvotes: 0

eerorika
eerorika

Reputation: 238351

what if the worker thread completes and before executing working_ = false, another enqueue comes?

Then the value will be pushed to the queue but will not be processed until another value is enqueued after the flag is set. You (or your users) may decide whether that is acceptable. This can be avoided using locks, but they're against your requirements.

The code may fail if the running thread is about to finish and sets working_ = false; but hasn't stopped running before next value is enqueued. In that case your code will call operator= on the running thread which results in a call to std::terminate according to the linked documentation.

Adding worker_.join() before assigning the worker to a new thread should prevent that.

Another problem is that queue_.push may fail if the queue is full because it has a fixed size. Currently you just ignore the case and the value will not be added to the full queue. If you wait for queue to have space, you don't get fast enqueue (in the edge case). You could take the bool returned by push (which tells if it was successful) and return it from enqueue. That way the caller may decide whether it wants to wait or discard the value.

Or use non-fixed size queue. Boost has this to say about that choice:

Can be used to completely disable dynamic memory allocations during push in order to ensure lockfree behavior. If the data structure is configured as fixed-sized, the internal nodes are stored inside an array and they are addressed by array indexing. This limits the possible size of the queue to the number of elements that can be addressed by the index type (usually 2**16-2), but on platforms that lack double-width compare-and-exchange instructions, this is the best way to achieve lock-freedom.

Upvotes: 2

Yakk - Adam Nevraumont
Yakk - Adam Nevraumont

Reputation: 275385

Your worker thread needs more than 2 states.

  • Not running
  • Doing tasks
  • Idle shutdown
  • Shutdown

If you force shut down, it skips idle shutdown. If you run out of tasks, it transitions to idle shutdown. In idle shutdown, it empties the task queue, then goes into shutting down.

Shutdown is set, then you walk off the end of your worker task.

The producer first puts things on the queue. Then it checks the worker state. If Shutdown or Idle shutdown, first join it (and transition it to not running) then launch a new worker. If not running, just launch a new worker.

If the producer wants to launch a new worker, it first makes sure that we are in the not running state (otherwise, logic error). We then transition to the Doing tasks state, and then we launch the worker thread.

If the producer wants to shut down the helper task, it sets the done flag. It then checks the worker state. If it is anything besides not running, it joins it.

This can result in a worker thread that is launched for no good reason.

There are a few cases where the above can block, but there where a few before as well.

Then, we write a formal or semi-formal proof that the above cannot lose messages, because when writing lock free code you aren't done until you have a proof.

Upvotes: 1

Related Questions