Oersted
Oersted

Reputation: 2734

follow up on memory ordering on a simple producer/consumer example

I previously post a question about producer/consumer threading model.

The answer by @JanSchultke and the comments there are leading me to propose a new implementation:

#include <algorithm>
#include <array>
#include <atomic>
#include <cassert>
#include <cmath>
#include <condition_variable>
#include <cstddef>
#include <cstdint>
#include <iostream>
#include <mutex>
#include <thread>

// #define LOG
// #define USEACTIVESLEEP

#ifdef USEACTIVESLEEP
#define MAYBEUNSUSED(var) static_cast<void>(var)
// functions to sleep for short period of times
// active wait but thread sleep has a too large overhead to allow for short
// delays
namespace {
// Iterations per nanosec
double gk_ItPerns;

void EstimateItPerns() noexcept {
    auto start = std::chrono::steady_clock::now();
    constexpr std::size_t NbIt{1000000};
    for (size_t i = 0; i < NbIt; ++i) {
        volatile size_t DoNotOptimize = 0;
        MAYBEUNSUSED(DoNotOptimize);
    }
    auto end = std::chrono::steady_clock::now();
    auto delay =
        std::chrono::duration_cast<std::chrono::nanoseconds>(end - start)
            .count();
    gk_ItPerns = static_cast<double>(NbIt) / static_cast<double>(delay);
}

void ActiveSleep(double i_ns) noexcept {
    std::size_t NbIt = static_cast<std::size_t>(i_ns * gk_ItPerns);
    for (size_t i = 0; i < NbIt; ++i) {
        volatile size_t DoNotOptimize = 0;
        MAYBEUNSUSED(DoNotOptimize);
    }
}
}  // namespace
#endif

class CAsyncAlgo {
   public:
    using Data_t = size_t;

   private:
    static constexpr size_t mNbData = 1024;

    size_t mWorkingIndex = 1;
    size_t mBufferIndex = 0;

    // type of data buffer
    using DataBuffer_t = std::array<Data_t, mNbData>;

    size_t mIndex = 0;
    bool mHasData = false;

    std::array<DataBuffer_t, 2> mSamples;

    // Mutex for condition_variable and atomics
    std::mutex mMutex;
    // Condition variable used to wake up the working thread
    std::condition_variable mWakeUp;
    // To stop the worker
    std::atomic<bool> mStop{false};
    // Is an Algo instance running?
    std::atomic<bool> mBusy{false};
    // Can an Algo instance be launched (for testing spurious wake-up)?
    std::atomic<bool> mReady{false};

    // working thread
    std::thread Worker;

    // WorkLoad internals
    // previous seen max value in buffer
    Data_t mMaxVal = 0;
    // number of processed data
    Data_t mProcessed = 0;

   private:
    bool Stop() const noexcept {
        // 1- no synch needed?
        return (mStop.load(std::memory_order_relaxed));
    }
    bool Ready() const noexcept {
        // 2- std::memory_order_acquire because needs to synchronize with the
        // store in main
        return (mReady.load(std::memory_order_acquire));
    }
    void WaitForJob() {
        std::unique_lock<std::mutex> lock(mMutex);
#ifdef LOG
        std::cout << "entering waiting state " << std::boolalpha << mStop
                  << std::endl;
#endif
        // 3- std::memory_order_relaxed not possible otherwise it could
        // theorytically be reordered before the previous Workload call
        mBusy.store(false, std::memory_order_release);
        assert(lock.owns_lock());
        mWakeUp.wait(lock, [this]() -> bool { return (Stop() || Ready()); });
        assert(lock.owns_lock());
        assert(mBusy || Stop());
        // 4- std::memory_order_relaxed because no need to synchronise, mReady
        // is loaded from the same thread
        mReady.store(false, std::memory_order_relaxed);
#ifdef LOG
        std::cout << "waked up " << std::this_thread::get_id() << std::endl;
#endif
    }
    // Check if the working buffer is holding increasing successive integers
    // from some point max value must be strictly greater than the one of the
    // previous call {5,6,7,3,4} is valid if previous greatest value is strictly
    // smaller than 7 {5,6,7,2,4} is invalid smallest value must also be
    // strictly greater than mMaxVal as buffers do not overlap
    void WorkLoad() {
        Data_t Max = mSamples[mWorkingIndex][mNbData - 1];
        Data_t Min = mSamples[mWorkingIndex][0];
        for (size_t i = 1; i < mNbData; ++i) {
            if (mSamples[mWorkingIndex][i] !=
                mSamples[mWorkingIndex][i - 1] + 1) {
                assert(mSamples[mWorkingIndex][i - 1] ==
                       (mSamples[mWorkingIndex][i] + mNbData - 1));
                Max = mSamples[mWorkingIndex][i - 1];
                Min = mSamples[mWorkingIndex][i];
            }
        }
        assert(Max > mMaxVal);
        assert(Min > mMaxVal);
        mMaxVal = Max;
        mProcessed += mNbData;
    }
    void MainLoop() {
        while (!Stop()) {
            WaitForJob();
            if (Stop()) {
                return;
            }
            WorkLoad();
        }
    }

   public:
    CAsyncAlgo() : Worker([this]() mutable -> void { MainLoop(); }) {}
    void Push(Data_t const Sample, size_t) {
        // writing one sample in current circular buffer
        mSamples[mBufferIndex][mIndex] = Sample;
        mIndex = (mIndex + 1) % mNbData;
        if (mIndex == 0) {
            // buffer is full
            mHasData = true;
        }
    }
    bool IsReady() {
        if (mHasData && mBusy.load(std::memory_order_acquire) == false) {
            return true;
        }
        return false;
    }

    void SubmitJob() {
#ifdef LOG
        std::cout << "SubmitJob" << std::endl;
#endif
        {
            std::lock_guard<std::mutex> lock(mMutex);
            // 5- std::memory_order_release because needs to synchronize with
            // load in worker
            mReady.store(true, std::memory_order_release);
            // 6- std::memory_order_relaxed because no synch needed, read only
            // by this thread
            mBusy.store(true, std::memory_order_relaxed);
            std::swap(mWorkingIndex, mBufferIndex);
            mIndex = 0;
            mHasData = false;
        }
        mWakeUp.notify_one();
    }
    void Run(double const, double &) const {
        // NOP
    }

    // destructor
    ///\details finishing computation and releasing resources
    ///\todo explicitely "close" computation before the end of life of the
    /// object
    ~CAsyncAlgo() {
        {
#ifdef LOG
            std::cout << "closing" << std::endl;
#endif
            std::lock_guard<std::mutex> lock(mMutex);
            // 7- std::memory_order_relaxed: on unlocking may synchronise with
            // the lock in wait in this case, the worker will see true
            mStop.store(true, std::memory_order_relaxed);
        }
        mWakeUp.notify_one();
        if (Worker.joinable()) {
#ifdef LOG
            std::cout << "waiting for last run" << std::endl;
#endif
            Worker.join();
#ifdef LOG
            std::cout << "finished" << std::endl;
#endif
            std::cout << "Processed " << GetNbProcessed() << " data"
                      << std::endl;
        }
    }

    size_t GetNbProcessed() { return mProcessed; }
};

static constexpr size_t NbSamples = 1000000;

int main() {
    CAsyncAlgo Algo;

    std::cout << std::this_thread::get_id() << std::endl;

#ifdef USEACTIVESLEEP
    EstimateItPerns();
    std::size_t acc{0};
#endif
    for (size_t i = 0; i < NbSamples; ++i) {
#ifdef USEACTIVESLEEP
        double period = 10000.;  // ns
        // manage data production frequency
        auto start = std::chrono::steady_clock::now();
#endif
        CAsyncAlgo::Data_t data =
            static_cast<CAsyncAlgo::Data_t>(i + 1);  // 0 is reserved
        Algo.Push(data, i);
#ifdef USEACTIVESLEEP
        auto end = std::chrono::steady_clock::now();
        // no more synchro needed as only this thread is designed to launch a
        // new computation
        if (static_cast<double>(
                std::chrono::duration_cast<std::chrono::nanoseconds>(end -
                                                                     start)
                    .count()) < period) {
            ActiveSleep(
                period -
                static_cast<double>(
                    std::chrono::duration_cast<std::chrono::nanoseconds>(end -
                                                                         start)
                        .count()));
        }
        end = std::chrono::steady_clock::now();
        acc = acc + static_cast<std::size_t>(
                        std::chrono::duration_cast<std::chrono::microseconds>(
                            end - start)
                            .count());
#endif
        if (Algo.IsReady()) {
#ifdef LOG
#ifdef USEACTIVESLEEP
            std::cout << "Ready " << i << " "
                      << static_cast<double>(acc) /
                             (static_cast<double>(i) + 1.)
                      << " us/Sample" << std::endl;
#endif
#endif
            Algo.SubmitJob();
        }
        double res;
        Algo.Run(3.14, res);
    }

#ifdef USEACTIVESLEEP
    std::cout << static_cast<double>(acc) / NbSamples << "us/Sample"
              << std::endl;
#endif

    return 0;
}

Live

I changed a bit the data managing and the workload in order to make it clearer I hope:

For the example, incoming data are merely increasing successive unsigned integers.
The workload merely checks that the different processed buffers do not overlap and actually contain successive integers.

Though the test I ran so far functioned, it's hard to be sure with multithreading that it will not fail later...
In the snippet above are included numbered comments (from 1 to 7) about my memory order choices. Are they valid?

Upvotes: 0

Views: 49

Answers (0)

Related Questions