Reputation: 2594
I have understanding issues on the memory-model. Here is the example (I tried to simplify it but it's still a bit long) I will base my question upon:
#include <atomic>
#include <cassert>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <optional>
#include <random>
#include <thread>
constexpr size_t hardware_destructive_interference_size = 128;
constexpr int maxWait = 1000;
constexpr int experimentRuns = 100;
constexpr int experimentLength = 1'000;
static void waitRandom() {
if (maxWait == 0) return;
thread_local std::mt19937 gen{std::random_device{}()};
std::uniform_int_distribution dist(0, maxWait);
auto waitIterations = dist(gen);
for (int i{0}; i < waitIterations; ++i) {
[[maybe_unused]] volatile int doNotOptimize = 0;
}
}
template <typename INPUT>
class Worker {
using input_t = INPUT;
using func_t = void (*)(input_t);
public:
Worker(func_t fun) {
mThread = std::thread([this, fun]() mutable {
while (!stop()) {
mainLoop(fun);
}
});
}
~Worker() {
{
std::lock_guard lk{mMutex};
mStop.store(true, std::memory_order_relaxed);
}
mCV.notify_one();
mThread.join();
}
bool ready() { return !mWorkerBusy.load(std::memory_order_acquire); }
template <typename T>
void submitJob(T&& input) {
assert(ready());
{
std::lock_guard lk{mMutex};
mInput.slot = std::forward<T>(input);
}
mWorkerBusy.store(true, std::memory_order_relaxed);
mCV.notify_one();
}
private:
void mainLoop(func_t& fun) {
waitForJob();
if (stop()) {
return;
}
fun(input());
completeJob();
}
void waitForJob() {
assert(!inputValid());
mWorkerBusy.store(false, std::memory_order_release);
{
std::unique_lock lk{mMutex};
mCV.wait(lk, [&] { return inputValid() || stop(); });
}
}
void completeJob() { mInput.slot.reset(); }
bool stop() const { return mStop.load(std::memory_order_relaxed); }
bool inputValid() const { return mInput.slot.has_value(); }
input_t& input() {
assert(inputValid());
return *mInput.slot;
}
private:
std::thread mThread;
std::mutex mMutex;
std::condition_variable mCV;
struct {
alignas(
hardware_destructive_interference_size) std::optional<input_t> slot;
} mInput;
std::atomic_bool mWorkerBusy = true;
std::atomic_bool mStop = false;
};
static void asyncProcess(int) { waitRandom(); }
static auto now() { return std::chrono::high_resolution_clock::now(); }
static void runExperiment() {
Worker w{asyncProcess};
const auto start = now();
int input{0};
for (int j{0}; j < experimentLength; ++j) {
do {
++input;
} while (!w.ready());
w.submitJob(input);
}
const auto duration = now() - start;
const auto durationNs =
std::chrono::duration_cast<std::chrono::microseconds>(duration).count();
const auto usPerSample = static_cast<double>(durationNs) / input;
std::cout << usPerSample << "us/Sample" << std::endl;
}
int main() {
using namespace std::literals::chrono_literals;
for (int i{0}; i < experimentRuns; ++i) {
runExperiment();
std::cerr << i + 1 << " experiment(s) done\n\n";
}
}
LIVE
Basically, the main thread produces data and a worker thread consumes them but they may not be at the same pace (producing is faster than consuming).
The "invariants" are:
One of my misunderstanding lies here:
void waitForJob() {
assert(!inputValid());
mWorkerBusy.store(false, std::memory_order_release);
{
std::unique_lock lk{mMutex};
mCV.wait(lk, [&] { return inputValid() || stop(); });
}
}
What would prevent the store
to be executed after the wait
, leaving the program in an undesired state (possibly mWorkerBusy
remaining true
ad vitam, preventing the worker to be waked up)?
I've got the same issue with:
template <typename T>
void submitJob(T&& input) {
assert(ready());
{
std::lock_guard lk{mMutex};
mInput.slot = std::forward<T>(input);
}
mWorkerBusy.store(true, std::memory_order_relaxed);
mCV.notify_one();
}
What would prevent notify_one
being called first (for instance)?
[EDIT] follow-up posted here
Upvotes: 3
Views: 207
Reputation: 39658
mWorkerBusy.store(false, std::memory_order_release); { std::unique_lock lk{mMutex}; mCV.wait(lk, [&] { return inputValid() || stop(); }); }
What would prevent the
store
to be executed after thewait
, leaving the program in an undesired state (possiblymWorkerBusy
remaining true ad vitam, preventing the worker to be waked up)?
The memory order doesn't guarantee you this. std::memory_order_release
prevents reads and writes on the current thread from being reordered after the store, but the memory operations could be reordered before, leading to the problem you've described.
What actually protects you here is the std::mutex
. When you lock a mutex:
Synchronization: Prior
unlock()
operations on the same object synchronize with this operation.
- [requirements.mutex.general]/8
9 An evaluation A inter-thread happens before an evaluation B if
- A synchronizes with B, or
- [...]
10 An evaluation A happens before an evaluation B (or, equivalently, B happens after A) if:
- A is sequenced before B, or
- A inter-thread happens before B.
Leading back to your example:
store
is sequenced before unlock()
on the producer threadunlock()
inter-thread happens before lock()
on a consumer threadstore
happens before lock()
on a consumer threadIn simpler terms, when you lock a mutex, you see what other threads have done before unlocking.
HOWEVER, there are some cases in your program with NO synchronization, and that can have unintended effects:
bool ready() { return !mWorkerBusy.load(std::memory_order_acquire); } // ... do { ++input; } while (!w.ready()); w.submitJob(input);
This code is a mess. There is no form of synchronization, so w.ready()
might be infinitely true in this loop, with the producer never seeing any change.
What might save you is:
Implementations should make atomic stores visible to atomic loads within a reasonable amount of time.
In case you're wondering why there is no synchronization: you're only loading a value. std::memory_order_acquire
means that if we see one change to mWorkerBusy
, we must also see the preceding changes. However, it doesn't guarantee that we read the most recent value.
In general, no synchronization between atomics takes place as long as you're only loading.
{ std::lock_guard lk{mMutex}; mInput.slot = std::forward<T>(input); } mWorkerBusy.store(true, std::memory_order_relaxed); mCV.notify_one();
What would prevent
notify_one
being called first (for instance)?
Absolutely nothing. You're using std::memory_order_relaxed
, so the operation can be reordered on the current thread past the .notify_one()
. This is a mistake! Never use std::memory_order_relaxed
if order of loads/stores in relation to other operations is relevant!
However, the condition variable performs some form of synchronization through the mutex. Whichever thread gets notified will acquire mMutex
, and will see mInput.slot = std::forward<T>(input);
, because it happened before mMutex.unlock()
(implicitly called through the guard).
Even with the "wrong" memory order, we can hijack the mutex to synchronize anyway:
{
std::lock_guard lk{mMutex};
mInput.slot = std::forward<T>(input);
// the atomic store is now sequenced before mMutex.unlock()
mWorkerBusy.store(true, std::memory_order_relaxed);
}
// mMutex.unlock() inter-thread happens before mMutex.lock(), elsewhere, which
// makes the change to mWorkerBusy visible
As you can see, your use of atomics makes very little sense in both 2. and 3.. You expect atomics to provide synchronization, but it's actually the condition variables and mutexes that do this.
A better way to synchronize producers and consumers is to use multiple condition variables:
The consumers and producers can then notify one another, and there are no memory issues if you properly use std::mutex
.
Another threading primitive that is specifically designed for synchronizing producers and consumers is std::binary_semaphore
/std::counting_semaphore
, which would be much better tools in this case.
Upvotes: 1