Reputation: 2917
We have some shared resource: a memory pool, thread-unsafe API, take your pick. We would like to control access to said resource via an ASIO strand. All routines accessing the resource should run on that strand.
We're also using C++20 coroutines and enjoy the illusion of sequential execution they provide.
When accessing the shared resource we would like to suspend the coroutine with co_await
, switch to the blessed strand, do whatever with the resource, and then return back to the coroutine on its native executor.
Caveats:
dispatch
trick", because the ergonomics are bad and it's a race condition waiting to happen ie.auto s1 = bind_executor(strand, asio::deferred);
co_await asio::dispatch(s1);
// Access shared resource
co_await asio::dispatch(asio::deferred);
asio::awaitable
and possibly allocate a coroutine frame for operations that don't ask for one (via use_awaitable
)This is what I hacked up this morning, obviously it's not great (not using concepts, doesn't forward arguments, no return values allowed, etc), but it illustrates what I'm going for:
(Godbolt)
static std::atomic_int tid_gen;
thread_local int const tid = tid_gen++;
inline void out(auto const& msg) { std::print("T{:x} {}\n", tid, msg); }
template <typename F, typename Ex, typename CT>
auto async_run_on(F f, Ex ex, CT&& token) {
return asio::async_initiate<CT, void()>(
[](auto handler, F f, Ex ex) {
ex.dispatch(
[=, handler = std::move(handler)]() mutable {
std::invoke(f);
handler.get_executor().execute(std::move(handler));
},
asio::get_associated_allocator(ex));
},
token, std::move(f), ex);
}
asio::awaitable<void> f(auto strand) {
out("Main");
co_await async_run_on([](){ out("Strand"); }, strand, asio::deferred);
out("Main again");
}
int main() {
asio::io_context io;
asio::thread_pool tp(1);
co_spawn(io, f(make_strand(tp)), asio::detached);
io.run();
}
It's hopefully self-evident that we could expand this to build callable awaitables that always run on a given executor and return to wherever.
How is this use case supposed to work?
I am very bad at ASIO, the worst. There's also zero chance I'm the first to run into this problem. This makes me immensely suspicious of any solution I come up with. Coroutines and strands are ASIO building blocks, and I'm no sehe or ChrisK, I'm not the person figuring out how these puzzle pieces go together.
But Google and SO and the ASIO docs are all weirdly silent about this. There is very little cross-pollination between the examples given for strands and the examples given for coroutines.
Are we supposed to be co_await
'ing strand-bound deferred operations each time or is there a free function I completely missed somewhere?
Upvotes: 3
Views: 348
Reputation: 392833
I really appreciate the lucid introduction in your first two paragraphs. They focus on functional goals.
Then, suddenly there comes a stark switch:
When accessing the shared resource we would like to suspend the coroutine with co_await, switch to the blessed strand, do whatever with the resource, and then return back to the coroutine on its native executor.
Suddenly we left the realm of functional requirements and interface design to implementation mechanics. Luckily your excellent example restores balance by actually providing an abstraction async_run_on
which is close to what I was going to suggest.
Going back to the functional requirements: "a memory pool, thread-unsafe API, take your pick[...] All routines accessing the resource should run on that strand."
If I was tasked with designing an Asio interface to that resource, I'd model it as an IO object. The IO object should bind to its designated executor (which in this case would usually be a strand), and the member initiation functions should dispatch to it (like async_run_on
, except the executor is no longer an argument but a member).
So, perhaps similar to your code, I'd make something like:
template <typename Executor> struct base_resource {
using executor_type = Executor;
executor_type get_executor() const { return ex_; }
base_resource(Executor ex) : ex_(std::move(ex)) {}
template <typename CT> auto async_foo(CT&& token) {
return asio::async_initiate<CT, void()>(
[this](auto handler) {
trace("initiating");
asio::dispatch(ex_, [this, h = std::move(handler)]() mutable { do_foo(std::move(h)); });
},
token);
}
private:
void do_foo(auto handler) {
trace("doing foo");
// complete with handler; dispatch to bound executor
asio::dispatch(std::move(handler));
}
Executor ex_;
};
Now you can use it without leaking implementation details:
using Resource = base_resource<asio::any_io_executor>;
asio::awaitable<void> f(Resource& res) {
trace("f invoke resource");
co_await res.async_foo(asio::deferred);
trace("f back");
}
If you want, you can still control that the continuation should stay on the resource's executor, though:
trace("f force stay on resource strand: ");
co_await res.async_foo(bind_executor(res.get_executor(), asio::deferred));
trace("f (strand should have stuck)");
co_await post(asio::deferred); // relinquish strand
trace("f back");
#include <boost/asio.hpp>
#include <fmt/core.h>
namespace asio = boost::asio;
using namespace std::chrono_literals;
static std::atomic_int tid_gen;
thread_local int const tid = tid_gen++;
std::optional<asio::any_io_executor> g_strand;
static inline void trace(auto const& msg) {
std::string_view tag = "other";
if (auto* s = g_strand->target<asio::strand<asio::io_context::executor_type>>();
s && s->running_in_this_thread())
tag = "on io_context strand";
if (auto* s = g_strand->target<asio::strand<asio::thread_pool::executor_type>>();
s && s->running_in_this_thread())
tag = "on thread_pool strand";
fmt::print("T{:x} {} [{}]\n", tid, tag, msg);
}
template <typename Executor> struct base_resource {
using executor_type = Executor;
executor_type get_executor() const { return ex_; }
base_resource(Executor ex) : ex_(std::move(ex)) {}
template <typename CT> auto async_foo(CT&& token) {
return asio::async_initiate<CT, void()>(
[this](auto handler) {
trace("initiating");
asio::dispatch(ex_, [this, h = std::move(handler)]() mutable { do_foo(std::move(h)); });
},
token);
}
private:
void do_foo(auto handler) {
trace("doing foo");
// complete with handler; dispatch to bound executor
asio::dispatch(std::move(handler));
}
Executor ex_;
};
using Resource = base_resource<asio::any_io_executor>;
asio::awaitable<void> f(Resource& res) {
trace("f invoke resource");
co_await res.async_foo(asio::deferred);
trace("f back");
trace("f force stay on resource strand: ");
co_await res.async_foo(bind_executor(res.get_executor(), asio::deferred));
trace("f (strand should have stuck)");
co_await post(asio::deferred); // relinquish strand
trace("f back");
}
int main() {
trace("Main");
asio::io_context io;
asio::thread_pool tp(1);
g_strand = asio::make_strand(tp);
Resource shared_res{*g_strand};
co_spawn(io, f(shared_res), asio::detached);
io.run();
tp.join();
g_strand.reset();
trace("Done");
}
Printing
T0 other [Main]
T0 other [f invoke resource]
T0 other [initiating]
T1 on thread_pool strand [doing foo]
T0 other [f back]
T0 other [f force stay on resource strand: ]
T0 other [initiating]
T1 on thread_pool strand [doing foo]
T1 on thread_pool strand [f (strand should have stuck)]
T0 other [f back]
T0 other [Done]
Links to related questions:
Some of these will have a wealth of perspective and details that can shape your design decisions:
Upvotes: 4