Reputation: 3540
I'm writing a message broker using C++20 coroutine. The broker has some sessions (connections). The broker deliver's message as follows:
sender(ss) broker receiver(sr)
| | |
| message | |
|-------------->| |
| | message |
| |---------->|
| | |
The delivery process is triggered by sender's session (ss). During the process, accessing receiver session (ss) is required. ss and sr could woking on the different thread and different strand. How to switch the strand well?
#include <boost/asio.hpp>
#include <iostream>
#include <syncstream>
#include <thread>
#include <cassert>
namespace as = boost::asio;
template <typename Executor>
struct session {
session(Executor exe):str_{exe} {}
void access() const {
std::osyncstream(std::cout) << std::this_thread::get_id() << " access()" << std::endl;
assert(str_.running_in_this_thread());
}
as::strand<Executor> str_;
};
int main() {
as::io_context iocs;
as::io_context iocr;
auto guards = as::make_work_guard(iocs.get_executor());
auto guardr = as::make_work_guard(iocr.get_executor());
session ss{iocs.get_executor()};
session sr{iocr.get_executor()};
co_spawn(
ss.str_, // sender's strand
[&] () -> as::awaitable<void> {
std::osyncstream(std::cout)
<< std::this_thread::get_id()
<< " sender start deliver"
<< std::endl;
// need to access to receiver's resource to deliver
#if 0
sr.access(); // assertion failed because this coroutine is
// working on sender's strand, not receiver's one
#endif
// so dispatch to the receiver's strand
co_await as::dispatch(sr.str_, as::use_awaitable);
#if 0
// B
sr.access(); // assertion failed because this coroutine is
// working on sender's strand, not receiver's one
#endif
// so dispatch to the receiver's strand
co_await as::dispatch(as::bind_executor(sr.str_, as::use_awaitable));
sr.access(); // working on receiver's thread.
// This is expected behavior.
guards.reset();
guardr.reset();
co_return;
},
as::detached
);
std::thread ts {
[&] {
std::osyncstream(std::cout) << std::this_thread::get_id() << " sender thread" << std::endl;
iocs.run();
}
};
std::thread tr {
[&] {
std::osyncstream(std::cout) << std::this_thread::get_id() << " receiver thread" << std::endl;
iocr.run();
}
};
ts.join();
tr.join();
}
godbolt link: https://godbolt.org/z/z8r6fYjj3
session
has the strand str_
and access()
function. access()
function expects the function is called on the str_
to eliminate resource lock.
co_spawn
itself is working of sender's strand.
It caused assertion failure. It is ovious because the code is working on sender's strand, not receiver's one.
It still caused assertion failure. It is not clear for me. I guess that the process is switched to receiver's strand but co_await switched back to the sender's strand because co_spawn's executor is sender's strand. It is used as default executor. That is just my guessing.
It works as I expected.
// so dispatch to the receiver's strand
co_await as::dispatch(as::bind_executor(sr.str_, as::use_awaitable));
sr.access(); // working on receiver's thread.
// This is expected behavior.
The part of code after co_await is working on receiver's strand even if co_spawn's executor is sender's strand.
However, I'm not sure it is safe. I suspect that it could be one apperence of undefined behavior.
If it is safe. I want to use this approach.
I read the code example of char server.
https://www.boost.org/doc/libs/1_84_0/doc/html/boost_asio/example/cpp20/coroutines/chat_server.cpp It is very interesting. The session has two coroutines that are reader() and writer(). And the writer()'s async_write() is triggered by cancel of the never fired timer. It is a bit tricky. It is nicely avoid continuous async_write() call before the aync process is completed using queue.
However, im my case, the session has already been have queing mechanism internally. So this trick is too much for me.
Is the following approach is safe ? Am I missing something important?
co_spawn(
base_strand,
coroutine_function
);
as::awaitable<void>
coroutine_function() {
// basically working on base_strand.
// ...
co_await some_async_func(..., bind_executor(strand1, as::use_awaitable));
// this part is working on strand1
// ...
co_await some_async_func(..., bind_executor(strand2, as::use_awaitable));
// this part is working on strand2
// ...
co_await some_async_func(..., as::use_awaitable);
// this part is working on base_strand
// ...
}
co_await as::dispatch(sr.str_, as::use_awaitable);
co_await as::dispatch(as::bind_executor(sr.str_, as::use_awaitable));
Why the first code isn't working on sr.str_
?
Upvotes: 3
Views: 233
Reputation: 392833
Q1 - Yes it is safe for Asio. A coroutine is an implicit strand because resumption is a linear chain of handlers. You can run them anywhere, including on strands.
Q2 - This is an oft-occurring confusion, e.g. Using a boost asio strand as a 'mutex' does not work with coroutines
See also these explanations:
Also look at the newer asio::deferred
instead of asio::use_awaitable
if you can: it's more efficient and less coroutine-specific.
Much simplified:
#include <boost/asio.hpp>
#include <cassert>
#include <iostream>
#include <syncstream>
namespace asio = boost::asio;
static int tid_gen = 0;
thread_local int tid = tid_gen++;
struct session {
session(asio::any_io_executor exe) : strand_{exe} {}
void access() const {
std::osyncstream(std::cout) << tid << " access()" << std::endl;
assert(strand_.running_in_this_thread());
}
asio::strand<asio::any_io_executor> strand_;
};
int main() {
asio::thread_pool iocs(1), iocr(1);
post(iocs, [] { std::osyncstream(std::cout) << tid << " sender thread" << std::endl; });
post(iocr, [] { std::osyncstream(std::cout) << tid << " receiver thread" << std::endl; });
session ss{iocs.get_executor()};
session sr{iocr.get_executor()};
co_spawn(
ss.strand_, // sender's strand
[&]() -> asio::awaitable<void> {
std::osyncstream(std::cout) << tid << " sender start deliver" << std::endl;
co_await asio::dispatch(asio::bind_executor(sr.strand_, asio::use_awaitable));
sr.access();
co_return;
},
asio::detached);
iocs.join();
iocr.join();
}
Printing e.g.
1 receiver thread
0 sender thread
0 sender start deliver
1 access()
Upvotes: 2