Reputation: 2734
I previously post a question about producer/consumer threading model.
The answer by @JanSchultke and the comments there are leading me to propose a new implementation:
#include <algorithm>
#include <array>
#include <atomic>
#include <cassert>
#include <cmath>
#include <condition_variable>
#include <cstddef>
#include <cstdint>
#include <iostream>
#include <mutex>
#include <thread>
// #define LOG
// #define USEACTIVESLEEP
#ifdef USEACTIVESLEEP
#define MAYBEUNSUSED(var) static_cast<void>(var)
// functions to sleep for short period of times
// active wait but thread sleep has a too large overhead to allow for short
// delays
namespace {
// Iterations per nanosec
double gk_ItPerns;
void EstimateItPerns() noexcept {
auto start = std::chrono::steady_clock::now();
constexpr std::size_t NbIt{1000000};
for (size_t i = 0; i < NbIt; ++i) {
volatile size_t DoNotOptimize = 0;
MAYBEUNSUSED(DoNotOptimize);
}
auto end = std::chrono::steady_clock::now();
auto delay =
std::chrono::duration_cast<std::chrono::nanoseconds>(end - start)
.count();
gk_ItPerns = static_cast<double>(NbIt) / static_cast<double>(delay);
}
void ActiveSleep(double i_ns) noexcept {
std::size_t NbIt = static_cast<std::size_t>(i_ns * gk_ItPerns);
for (size_t i = 0; i < NbIt; ++i) {
volatile size_t DoNotOptimize = 0;
MAYBEUNSUSED(DoNotOptimize);
}
}
} // namespace
#endif
class CAsyncAlgo {
public:
using Data_t = size_t;
private:
static constexpr size_t mNbData = 1024;
size_t mWorkingIndex = 1;
size_t mBufferIndex = 0;
// type of data buffer
using DataBuffer_t = std::array<Data_t, mNbData>;
size_t mIndex = 0;
bool mHasData = false;
std::array<DataBuffer_t, 2> mSamples;
// Mutex for condition_variable and atomics
std::mutex mMutex;
// Condition variable used to wake up the working thread
std::condition_variable mWakeUp;
// To stop the worker
std::atomic<bool> mStop{false};
// Is an Algo instance running?
std::atomic<bool> mBusy{false};
// Can an Algo instance be launched (for testing spurious wake-up)?
std::atomic<bool> mReady{false};
// working thread
std::thread Worker;
// WorkLoad internals
// previous seen max value in buffer
Data_t mMaxVal = 0;
// number of processed data
Data_t mProcessed = 0;
private:
bool Stop() const noexcept {
// 1- no synch needed?
return (mStop.load(std::memory_order_relaxed));
}
bool Ready() const noexcept {
// 2- std::memory_order_acquire because needs to synchronize with the
// store in main
return (mReady.load(std::memory_order_acquire));
}
void WaitForJob() {
std::unique_lock<std::mutex> lock(mMutex);
#ifdef LOG
std::cout << "entering waiting state " << std::boolalpha << mStop
<< std::endl;
#endif
// 3- std::memory_order_relaxed not possible otherwise it could
// theorytically be reordered before the previous Workload call
mBusy.store(false, std::memory_order_release);
assert(lock.owns_lock());
mWakeUp.wait(lock, [this]() -> bool { return (Stop() || Ready()); });
assert(lock.owns_lock());
assert(mBusy || Stop());
// 4- std::memory_order_relaxed because no need to synchronise, mReady
// is loaded from the same thread
mReady.store(false, std::memory_order_relaxed);
#ifdef LOG
std::cout << "waked up " << std::this_thread::get_id() << std::endl;
#endif
}
// Check if the working buffer is holding increasing successive integers
// from some point max value must be strictly greater than the one of the
// previous call {5,6,7,3,4} is valid if previous greatest value is strictly
// smaller than 7 {5,6,7,2,4} is invalid smallest value must also be
// strictly greater than mMaxVal as buffers do not overlap
void WorkLoad() {
Data_t Max = mSamples[mWorkingIndex][mNbData - 1];
Data_t Min = mSamples[mWorkingIndex][0];
for (size_t i = 1; i < mNbData; ++i) {
if (mSamples[mWorkingIndex][i] !=
mSamples[mWorkingIndex][i - 1] + 1) {
assert(mSamples[mWorkingIndex][i - 1] ==
(mSamples[mWorkingIndex][i] + mNbData - 1));
Max = mSamples[mWorkingIndex][i - 1];
Min = mSamples[mWorkingIndex][i];
}
}
assert(Max > mMaxVal);
assert(Min > mMaxVal);
mMaxVal = Max;
mProcessed += mNbData;
}
void MainLoop() {
while (!Stop()) {
WaitForJob();
if (Stop()) {
return;
}
WorkLoad();
}
}
public:
CAsyncAlgo() : Worker([this]() mutable -> void { MainLoop(); }) {}
void Push(Data_t const Sample, size_t) {
// writing one sample in current circular buffer
mSamples[mBufferIndex][mIndex] = Sample;
mIndex = (mIndex + 1) % mNbData;
if (mIndex == 0) {
// buffer is full
mHasData = true;
}
}
bool IsReady() {
if (mHasData && mBusy.load(std::memory_order_acquire) == false) {
return true;
}
return false;
}
void SubmitJob() {
#ifdef LOG
std::cout << "SubmitJob" << std::endl;
#endif
{
std::lock_guard<std::mutex> lock(mMutex);
// 5- std::memory_order_release because needs to synchronize with
// load in worker
mReady.store(true, std::memory_order_release);
// 6- std::memory_order_relaxed because no synch needed, read only
// by this thread
mBusy.store(true, std::memory_order_relaxed);
std::swap(mWorkingIndex, mBufferIndex);
mIndex = 0;
mHasData = false;
}
mWakeUp.notify_one();
}
void Run(double const, double &) const {
// NOP
}
// destructor
///\details finishing computation and releasing resources
///\todo explicitely "close" computation before the end of life of the
/// object
~CAsyncAlgo() {
{
#ifdef LOG
std::cout << "closing" << std::endl;
#endif
std::lock_guard<std::mutex> lock(mMutex);
// 7- std::memory_order_relaxed: on unlocking may synchronise with
// the lock in wait in this case, the worker will see true
mStop.store(true, std::memory_order_relaxed);
}
mWakeUp.notify_one();
if (Worker.joinable()) {
#ifdef LOG
std::cout << "waiting for last run" << std::endl;
#endif
Worker.join();
#ifdef LOG
std::cout << "finished" << std::endl;
#endif
std::cout << "Processed " << GetNbProcessed() << " data"
<< std::endl;
}
}
size_t GetNbProcessed() { return mProcessed; }
};
static constexpr size_t NbSamples = 1000000;
int main() {
CAsyncAlgo Algo;
std::cout << std::this_thread::get_id() << std::endl;
#ifdef USEACTIVESLEEP
EstimateItPerns();
std::size_t acc{0};
#endif
for (size_t i = 0; i < NbSamples; ++i) {
#ifdef USEACTIVESLEEP
double period = 10000.; // ns
// manage data production frequency
auto start = std::chrono::steady_clock::now();
#endif
CAsyncAlgo::Data_t data =
static_cast<CAsyncAlgo::Data_t>(i + 1); // 0 is reserved
Algo.Push(data, i);
#ifdef USEACTIVESLEEP
auto end = std::chrono::steady_clock::now();
// no more synchro needed as only this thread is designed to launch a
// new computation
if (static_cast<double>(
std::chrono::duration_cast<std::chrono::nanoseconds>(end -
start)
.count()) < period) {
ActiveSleep(
period -
static_cast<double>(
std::chrono::duration_cast<std::chrono::nanoseconds>(end -
start)
.count()));
}
end = std::chrono::steady_clock::now();
acc = acc + static_cast<std::size_t>(
std::chrono::duration_cast<std::chrono::microseconds>(
end - start)
.count());
#endif
if (Algo.IsReady()) {
#ifdef LOG
#ifdef USEACTIVESLEEP
std::cout << "Ready " << i << " "
<< static_cast<double>(acc) /
(static_cast<double>(i) + 1.)
<< " us/Sample" << std::endl;
#endif
#endif
Algo.SubmitJob();
}
double res;
Algo.Run(3.14, res);
}
#ifdef USEACTIVESLEEP
std::cout << static_cast<double>(acc) / NbSamples << "us/Sample"
<< std::endl;
#endif
return 0;
}
I changed a bit the data managing and the workload in order to make it clearer I hope:
For the example, incoming data are merely increasing successive unsigned integers.
The workload merely checks that the different processed buffers do not overlap and actually contain successive integers.
Though the test I ran so far functioned, it's hard to be sure with multithreading that it will not fail later...
In the snippet above are included numbered comments (from 1 to 7) about my memory order choices.
Are they valid?
Upvotes: 0
Views: 49