nickelpro
nickelpro

Reputation: 2917

ASIO: co_await callable to be run on a strand

The Problem

We have some shared resource: a memory pool, thread-unsafe API, take your pick. We would like to control access to said resource via an ASIO strand. All routines accessing the resource should run on that strand.

We're also using C++20 coroutines and enjoy the illusion of sequential execution they provide.

When accessing the shared resource we would like to suspend the coroutine with co_await, switch to the blessed strand, do whatever with the resource, and then return back to the coroutine on its native executor.

Caveats:

auto s1 = bind_executor(strand, asio::deferred);
co_await asio::dispatch(s1);
// Access shared resource
co_await asio::dispatch(asio::deferred);

The Current Solution

This is what I hacked up this morning, obviously it's not great (not using concepts, doesn't forward arguments, no return values allowed, etc), but it illustrates what I'm going for:

(Godbolt)

static std::atomic_int tid_gen;
thread_local int const tid = tid_gen++;

inline void out(auto const& msg) { std::print("T{:x} {}\n", tid, msg); }

template <typename F, typename Ex, typename CT>
auto async_run_on(F f, Ex ex, CT&& token) {
  return asio::async_initiate<CT, void()>(
      [](auto handler, F f, Ex ex) {
        ex.dispatch(
            [=, handler = std::move(handler)]() mutable {
              std::invoke(f);
              handler.get_executor().execute(std::move(handler));
            },
            asio::get_associated_allocator(ex));
      },
      token, std::move(f), ex);
}

asio::awaitable<void> f(auto strand) {
    out("Main");

    co_await async_run_on([](){ out("Strand"); }, strand, asio::deferred);
    
    out("Main again");
}

int main() {
    asio::io_context io;
    asio::thread_pool tp(1);

    co_spawn(io, f(make_strand(tp)), asio::detached);
    io.run();
}

It's hopefully self-evident that we could expand this to build callable awaitables that always run on a given executor and return to wherever.

The Question

How is this use case supposed to work?

I am very bad at ASIO, the worst. There's also zero chance I'm the first to run into this problem. This makes me immensely suspicious of any solution I come up with. Coroutines and strands are ASIO building blocks, and I'm no sehe or ChrisK, I'm not the person figuring out how these puzzle pieces go together.

But Google and SO and the ASIO docs are all weirdly silent about this. There is very little cross-pollination between the examples given for strands and the examples given for coroutines.

Are we supposed to be co_await'ing strand-bound deferred operations each time or is there a free function I completely missed somewhere?

Upvotes: 3

Views: 348

Answers (1)

sehe
sehe

Reputation: 392833

I really appreciate the lucid introduction in your first two paragraphs. They focus on functional goals.

Then, suddenly there comes a stark switch:

When accessing the shared resource we would like to suspend the coroutine with co_await, switch to the blessed strand, do whatever with the resource, and then return back to the coroutine on its native executor.

Suddenly we left the realm of functional requirements and interface design to implementation mechanics. Luckily your excellent example restores balance by actually providing an abstraction async_run_on which is close to what I was going to suggest.

Going back to the functional requirements: "a memory pool, thread-unsafe API, take your pick[...] All routines accessing the resource should run on that strand."

If I was tasked with designing an Asio interface to that resource, I'd model it as an IO object. The IO object should bind to its designated executor (which in this case would usually be a strand), and the member initiation functions should dispatch to it (like async_run_on, except the executor is no longer an argument but a member).

So, perhaps similar to your code, I'd make something like:

template <typename Executor> struct base_resource {
    using executor_type = Executor;
    executor_type get_executor() const { return ex_; }

    base_resource(Executor ex) : ex_(std::move(ex)) {}

    template <typename CT> auto async_foo(CT&& token) {
        return asio::async_initiate<CT, void()>(
            [this](auto handler) {
                trace("initiating");
                asio::dispatch(ex_, [this, h = std::move(handler)]() mutable { do_foo(std::move(h)); });
            },
            token);
    }

  private:
    void do_foo(auto handler) {
        trace("doing foo");
        // complete with handler; dispatch to bound executor
        asio::dispatch(std::move(handler));
    }

    Executor ex_;
};

Now you can use it without leaking implementation details:

using Resource = base_resource<asio::any_io_executor>;

asio::awaitable<void> f(Resource& res) {
    trace("f invoke resource");
    co_await res.async_foo(asio::deferred);
    trace("f back");
}

If you want, you can still control that the continuation should stay on the resource's executor, though:

    trace("f force stay on resource strand: ");
    co_await res.async_foo(bind_executor(res.get_executor(), asio::deferred));
    trace("f (strand should have stuck)");
    co_await post(asio::deferred); // relinquish strand
    trace("f back");

Full Demo

Live On Coliru

#include <boost/asio.hpp>
#include <fmt/core.h>

namespace asio = boost::asio;
using namespace std::chrono_literals;

static std::atomic_int tid_gen;
thread_local int const tid = tid_gen++;

std::optional<asio::any_io_executor> g_strand;

static inline void trace(auto const& msg) {
    std::string_view tag = "other";

    if (auto* s = g_strand->target<asio::strand<asio::io_context::executor_type>>();
        s && s->running_in_this_thread())
        tag = "on io_context strand";
    if (auto* s = g_strand->target<asio::strand<asio::thread_pool::executor_type>>();
        s && s->running_in_this_thread())
        tag = "on thread_pool strand";

    fmt::print("T{:x} {} [{}]\n", tid, tag, msg);
}

template <typename Executor> struct base_resource {
    using executor_type = Executor;
    executor_type get_executor() const { return ex_; }

    base_resource(Executor ex) : ex_(std::move(ex)) {}

    template <typename CT> auto async_foo(CT&& token) {
        return asio::async_initiate<CT, void()>(
            [this](auto handler) {
                trace("initiating");
                asio::dispatch(ex_, [this, h = std::move(handler)]() mutable { do_foo(std::move(h)); });
            },
            token);
    }

  private:
    void do_foo(auto handler) {
        trace("doing foo");
        // complete with handler; dispatch to bound executor
        asio::dispatch(std::move(handler));
    }

    Executor ex_;
};

using Resource = base_resource<asio::any_io_executor>;

asio::awaitable<void> f(Resource& res) {
    trace("f invoke resource");
    co_await res.async_foo(asio::deferred);
    trace("f back");

    trace("f force stay on resource strand: ");
    co_await res.async_foo(bind_executor(res.get_executor(), asio::deferred));
    trace("f (strand should have stuck)");
    co_await post(asio::deferred); // relinquish strand
    trace("f back");
}

int main() {
    trace("Main");
    asio::io_context io;

    asio::thread_pool tp(1);
    g_strand = asio::make_strand(tp);

    Resource shared_res{*g_strand};
    co_spawn(io, f(shared_res), asio::detached);

    io.run();
    tp.join();

    g_strand.reset();
    trace("Done");
}

Printing

T0 other [Main]
T0 other [f invoke resource]
T0 other [initiating]
T1 on thread_pool strand [doing foo]
T0 other [f back]
T0 other [f force stay on resource strand: ]
T0 other [initiating]
T1 on thread_pool strand [doing foo]
T1 on thread_pool strand [f (strand should have stuck)]
T0 other [f back]
T0 other [Done]

Links to related questions:

Some of these will have a wealth of perspective and details that can shape your design decisions:

Upvotes: 4

Related Questions