Takatoshi Kondo
Takatoshi Kondo

Reputation: 3540

Is it safe that the one coroutine function working on multiple strand/thread?

Motivation

I'm writing a message broker using C++20 coroutine. The broker has some sessions (connections). The broker deliver's message as follows:

sender(ss)       broker  receiver(sr)
   |               |           |
   | message       |           |
   |-------------->|           |
   |               | message   |
   |               |---------->|
   |               |           |

The delivery process is triggered by sender's session (ss). During the process, accessing receiver session (ss) is required. ss and sr could woking on the different thread and different strand. How to switch the strand well?

Minimal code example to demonstrate the situation

#include <boost/asio.hpp>
#include <iostream>
#include <syncstream>
#include <thread>
#include <cassert>

namespace as = boost::asio;

template <typename Executor>
struct session {
    session(Executor exe):str_{exe} {}
    void access() const {
        std::osyncstream(std::cout) << std::this_thread::get_id() << " access()" << std::endl;
        assert(str_.running_in_this_thread());
    }
    as::strand<Executor> str_;
};

int main() {
    as::io_context iocs;
    as::io_context iocr;
    auto guards = as::make_work_guard(iocs.get_executor());
    auto guardr = as::make_work_guard(iocr.get_executor());
    session ss{iocs.get_executor()};
    session sr{iocr.get_executor()};

    co_spawn(
        ss.str_,  // sender's strand
        [&] () -> as::awaitable<void> {
            std::osyncstream(std::cout) 
                << std::this_thread::get_id() 
                << " sender start deliver" 
                << std::endl;
            // need to access to receiver's resource to deliver
#if 0
            sr.access(); // assertion failed because this coroutine is
                         // working on sender's strand, not receiver's one
#endif
            // so dispatch to the receiver's strand
            co_await as::dispatch(sr.str_, as::use_awaitable);
#if 0
                         // B
            sr.access(); // assertion failed because this coroutine is
                         // working on sender's strand, not receiver's one
#endif
            // so dispatch to the receiver's strand
            co_await as::dispatch(as::bind_executor(sr.str_, as::use_awaitable));

            sr.access(); // working on receiver's thread. 
                         // This is expected behavior. 

            guards.reset();
            guardr.reset();
            co_return;
        },
        as::detached
    );
    std::thread ts {
        [&] {
            std::osyncstream(std::cout) << std::this_thread::get_id() << " sender thread" << std::endl;
            iocs.run();
        }
    };
    std::thread tr {
        [&] {
            std::osyncstream(std::cout) << std::this_thread::get_id() << " receiver thread" << std::endl;
            iocr.run();
        }
    };
    ts.join();
    tr.join();
}

godbolt link: https://godbolt.org/z/z8r6fYjj3

session has the strand str_ and access() function. access() function expects the function is called on the str_ to eliminate resource lock.

co_spawn itself is working of sender's strand.

What I tried

Direct call receiver's access()

It caused assertion failure. It is ovious because the code is working on sender's strand, not receiver's one.

Use dispatch with receiver's strand as the first parameter (Executor)

It still caused assertion failure. It is not clear for me. I guess that the process is switched to receiver's strand but co_await switched back to the sender's strand because co_spawn's executor is sender's strand. It is used as default executor. That is just my guessing.

Use dispatch with bind_executor and its first argument (bind executor) is receiver's strand

It works as I expected.

            // so dispatch to the receiver's strand
            co_await as::dispatch(as::bind_executor(sr.str_, as::use_awaitable));

            sr.access(); // working on receiver's thread. 
                         // This is expected behavior. 

The part of code after co_await is working on receiver's strand even if co_spawn's executor is sender's strand.

However, I'm not sure it is safe. I suspect that it could be one apperence of undefined behavior.

If it is safe. I want to use this approach.

Another approach

I read the code example of char server.

https://www.boost.org/doc/libs/1_84_0/doc/html/boost_asio/example/cpp20/coroutines/chat_server.cpp It is very interesting. The session has two coroutines that are reader() and writer(). And the writer()'s async_write() is triggered by cancel of the never fired timer. It is a bit tricky. It is nicely avoid continuous async_write() call before the aync process is completed using queue.

However, im my case, the session has already been have queing mechanism internally. So this trick is too much for me.

Summarize the question

Question1

Is the following approach is safe ? Am I missing something important?

co_spawn(
    base_strand,
    coroutine_function
);
as::awaitable<void>
coroutine_function() {
    // basically working on base_strand.
    // ...

    co_await some_async_func(..., bind_executor(strand1, as::use_awaitable));
    // this part is working on strand1
    // ...

    co_await some_async_func(..., bind_executor(strand2, as::use_awaitable));
    // this part is working on strand2
    // ...

    co_await some_async_func(..., as::use_awaitable);
    // this part is working on base_strand
    // ...
}

Question2

co_await as::dispatch(sr.str_, as::use_awaitable);
co_await as::dispatch(as::bind_executor(sr.str_, as::use_awaitable));

Why the first code isn't working on sr.str_ ?

Upvotes: 3

Views: 233

Answers (1)

sehe
sehe

Reputation: 392833

Q1 - Yes it is safe for Asio. A coroutine is an implicit strand because resumption is a linear chain of handlers. You can run them anywhere, including on strands.

Q2 - This is an oft-occurring confusion, e.g. Using a boost asio strand as a 'mutex' does not work with coroutines

See also these explanations:

Also look at the newer asio::deferred instead of asio::use_awaitable if you can: it's more efficient and less coroutine-specific.

Much simplified:

Live On Coliru

#include <boost/asio.hpp>
#include <cassert>
#include <iostream>
#include <syncstream>

namespace asio = boost::asio;

static int       tid_gen = 0;
thread_local int tid     = tid_gen++;

struct session {
    session(asio::any_io_executor exe) : strand_{exe} {}

    void access() const {
        std::osyncstream(std::cout) << tid << " access()" << std::endl;
        assert(strand_.running_in_this_thread());
    }

    asio::strand<asio::any_io_executor> strand_;
};

int main() {
    asio::thread_pool iocs(1), iocr(1);
    post(iocs, [] { std::osyncstream(std::cout) << tid << " sender thread" << std::endl; });
    post(iocr, [] { std::osyncstream(std::cout) << tid << " receiver thread" << std::endl; });
    session ss{iocs.get_executor()};
    session sr{iocr.get_executor()};

    co_spawn(
        ss.strand_, // sender's strand
        [&]() -> asio::awaitable<void> {
            std::osyncstream(std::cout) << tid << " sender start deliver" << std::endl;

            co_await asio::dispatch(asio::bind_executor(sr.strand_, asio::use_awaitable));

            sr.access();
            co_return;
        },
        asio::detached);

    iocs.join();
    iocr.join();
}

Printing e.g.

1 receiver thread
0 sender thread
0 sender start deliver
1 access()

Upvotes: 2

Related Questions