Damir Tenishev
Damir Tenishev

Reputation: 3402

Two threads alternating (ping-pong) execution

Is it possible to make guaranteed alternating execution of two threads without using two atomics (or other stuff like semaphores, etc.)? I want to make sure the both threads execute, preferably if one waited, it should go first when another one has passed current iteration.

Simple std::mutex doesn’t work since one thread could occupy all time, not giving a chance to another thread to take the mutex timely. The same issue happens even with std::atomic_flag::wait.

The desired result could be achieved with two atomic flags like in the demo:

#include <thread>
#include <iostream>
#include <vector>

std::atomic_flag ping_lock_flag = ATOMIC_FLAG_INIT;
std::atomic_flag pong_lock_flag = ATOMIC_FLAG_INIT;
std::atomic_int idx = 0;
std::vector<int> v(10);

void update()
{
    ping_lock_flag.wait(false, std::memory_order_relaxed);
    ping_lock_flag.clear(std::memory_order_release);

    std::cout << "update start\n";

    std::size_t index = idx.load();
    if (index < v.size()) {
        v[index] = 1;
        idx.fetch_add(1);
    }

    std::cout << "update end\n";

    pong_lock_flag.test_and_set(std::memory_order_acquire);
    pong_lock_flag.notify_one();
}

void render()
{
    // Some hard work before locking (prologue)
    // ...

    pong_lock_flag.wait(false, std::memory_order_relaxed);
    pong_lock_flag.clear(std::memory_order_release);

    std::cout << "render start\n";

    std::size_t index = idx.load();
    if (index < v.size()) {
        v[index] = 2;
        idx.fetch_add(1);
    }

    std::cout << "render end\n";

    ping_lock_flag.test_and_set(std::memory_order_acquire);
    ping_lock_flag.notify_one();

    // Some hard work after locking (epilogue)
    // ...
}

int main()
{
    std::jthread update_thread([&]() { while (idx.load() < v.size()) { update(); } });
    std::jthread render_thread([&]() { while (idx.load() < v.size()) { render(); } });

    ping_lock_flag.test_and_set(std::memory_order_acquire);

    update_thread.join();
    render_thread.join();

    for (auto& value : v) {
        std::cout << value << ", ";
    }

    std::cout << '\n';
}

with the output:

update start
update end
render start
render end
update start
update end
render start
render end
update start
update end
render start
render end
update start
update end
render start
render end
update start
update end
render start
render end
update start
update end
1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 

The question is if it is possible to get the same result simpler with modern C++ multithreading tools and if not, why?

Reasoning

The purpose it to have operations like update and render in separate threads, but synchronized. Why separate threads in this case? Because synchronization covers only work with internal data and some framework render operations like clearing the device, copying the results to video memory, etc. could be done in parallel with the update. I marked these parts as prologue and epilogue in the render code above. The desire to run this code in parallel with update is the key of the separating work to two threads here. If there is a better way to do so, please advise.

Without ping-pong I have a good change that the code will only run update never running render and vise versa.

With this I have some relaxation the the requirements; namely I am fine if I get some spurious wakeups or miss some wake ups very rarely if it doesn't lead to full stalling one or both threads.

Update

As a second thought I came to the following solution, but I still not sure that it is safe (taking into account all reservations from the Reasoning section).

Here is the demo:

#include <thread>
#include <iostream>
#include <vector>

std::atomic_flag ping_lock_flag = ATOMIC_FLAG_INIT;
std::atomic_int idx = 0;
std::vector<int> v(10);

void update()
{
    ping_lock_flag.wait(false, std::memory_order_relaxed);

    std::cout << "update start\n";

    std::size_t index = idx.load();
    if (index < v.size()) {
        v[index] = 1;
        idx.fetch_add(1);
    }

    std::cout << "update end\n";

    ping_lock_flag.clear(std::memory_order_release);
    ping_lock_flag.notify_one();
}

void render()
{
    // Some hard work before locking (prologue)
    // ...

    ping_lock_flag.wait(true, std::memory_order_relaxed);

    std::cout << "render start\n";

    std::size_t index = idx.load();
    if (index < v.size()) {
        v[index] = 2;
        idx.fetch_add(1);
    }

    std::cout << "render end\n";

    ping_lock_flag.test_and_set(std::memory_order_acquire);
    ping_lock_flag.notify_one();

    // Some hard work after locking (epilogue)
    // ...
}

int main()
{
    std::jthread update_thread([&]() { while (idx.load() < v.size()) { update(); } });
    std::jthread render_thread([&]() { while (idx.load() < v.size()) { render(); } });

    ping_lock_flag.test_and_set(std::memory_order_acquire);

    update_thread.join();
    render_thread.join();

    for (auto& value : v) {
        std::cout << value << ", ";
    }

    std::cout << '\n';
}

It works, but is there some issues I might have missed?

Update 2

I found the issue with the approach suggested above. It enforces "hard ping-pong", render can pass only after update and vice versa. These is no chance to have update-update-render in case update is faster. It seems that I have to rework the requirements. The key is that in case update is N times faster than render I want 'updateto pass approximately N times more often thanrender, but still give a chance to render` when it is ready.

Update 3

This solution should work:

#include <chrono>
#include <thread>
#include <iostream>
#include <vector>

std::atomic_flag render_update_lock = ATOMIC_FLAG_INIT;
std::atomic_flag render_thread_ready = ATOMIC_FLAG_INIT;
std::atomic_int idx = 0;
std::vector<int> v(10);

using namespace std::chrono_literals;

void update()
{
    if (!render_thread_ready.test()) {
        render_update_lock.wait(true, std::memory_order_relaxed);
        render_update_lock.test_and_set(std::memory_order_acquire);

        std::cout << "update start\n";

        std::size_t index = idx.load();
        if (index < v.size()) {
            v[index] = 1;
            idx.fetch_add(1);
        }

        std::cout << "update end\n";

        render_update_lock.clear(std::memory_order_release);
        render_update_lock.notify_one();
    }

    std::this_thread::sleep_for(1us);
}

void render()
{
    render_thread_ready.test_and_set(std::memory_order_acquire);

    render_update_lock.wait(true, std::memory_order_relaxed);
    render_update_lock.test_and_set(std::memory_order_acquire);

    render_thread_ready.clear(std::memory_order_release);

    std::cout << "render start\n";

    std::size_t index = idx.load();
    if (index < v.size()) {
        v[index] = 2;
        idx.fetch_add(1);
    }

    render_update_lock.clear(std::memory_order_release);
    render_update_lock.notify_one();

    std::cout << "render end\n";

    std::this_thread::sleep_for(2us);
}

int main()
{
    std::jthread update_thread([&]() { while (idx.load() < v.size()) { update(); } });
    std::jthread render_thread([&]() { while (idx.load() < v.size()) { render(); } });

    update_thread.join();
    render_thread.join();

    for (auto& value : v) {
        std::cout << value << ", ";
    }

    std::cout << '\n';
}

Although I am not checked it hard, so I can miss some potential issues.

Upvotes: 0

Views: 139

Answers (1)

Mattia Piras
Mattia Piras

Reputation: 86

You can use condition variables, which are synchronization primitive that work paired with mutexes. The idea is that inside a critical section, a thread can wait on a condition variable if a given predicate (or condition) is satisfied. The thread will be notified by another thread when it can execute again. In this case you can use the condition variables methods wait, notify_one to alternate the 2 thread execution. Whenever a thread waits on a condition variable, the mutex held is released, so that resolves the problem of one thread constantly holding the mutex over the other one.

This is also preferrable over atomic variables, because it involves passing waiting.

Since we keep the critical section (mutex) there's also no need for atomic variables to guarantee consistency.

#include <thread>
#include <iostream>
#include <vector>
#include <mutex>
#include <condition_variable>

std::mutex mtx;                          // Mutex to wait with cond_vars
std::condition_variable cv_ping;         // Condition variable for thread1
std::condition_variable cv_pong;         // Condition variable for thread2
std::size_t idx = 0;                     // Shared index for the vector
std::vector<int> v(10);

void update()
{
    // Wait until it's thread 1's turn to run
    std::unique_lock<std::mutex> lock(mtx);
    cv_ping.wait(lock, []() { return idx % 2 == 0; });

    std::cout << "update start\n";

    // If there is room, update the vector and increment the index
    if (idx < v.size()) {
        v[idx] = 1;
        idx++;
    }

    std::cout << "update end\n";

    // Notify the second thread that it's its turn
    cv_pong.notify_one();
}

void render()
{
    // Wait until it's thread 2's turn to run
    std::unique_lock<std::mutex> lock(mtx);
    cv_pong.wait(lock, []() { return idx % 2 == 1; });

    std::cout << "render start\n";

    // If there is room, update the vector and increment the index
    if (idx < v.size()) {
        v[idx] = 2;
        idx++;
    }

    std::cout << "render end\n";

    // Notify the first thread that it's its turn
    cv_ping.notify_one();
}

int main()
{
    // Start two threads that will run their respective functions in a loop
    std::thread update_thread([&]() { while (idx < v.size()) { update(); } });
    std::thread render_thread([&]() { while (idx < v.size()) { render(); } });

    // Initially allow thread1 to run first
    {
        std::lock_guard<std::mutex> lock(mtx);
        cv_ping.notify_one();
    }

    // Wait for both threads to finish
    update_thread.join();
    render_thread.join();

    //Cleanup
}

From this approach you can easily extend the ping-pong to an arbitrary number of alternating threads. You can use one condition variable for every thread, maybe holding them in a vector can be easier. Whenever a given thread i (which waits on condition variable at index i) is done executing it notifies the thread at index (i+1) % vector_length (modulo the vector of condition variables, so that the last thread notifies the first). The condition variable semantics guarantee that a thread that wakes up, has to first obtain the mutex so the mutual exclusion execution over the critical section is preserved.

A much more straightforward approach would be to use only one condition variable and a thread calls notify_all to wake up all the threads suspended on the condition variable. After all threads wake up, all of them beside one (the one that has to really execute) will be back waiting.Despite all, I woudn't suggest this, as it creates a lot more contention on the mutex, while the latter ensures that contention can only happen between one pair of threads at the time (it would be a lot smaller).

Hope I explained myself clearly :)

Upvotes: 1

Related Questions