Reputation: 29
I'm trying to implement a protocol over TCP sockets using the (boost) asio library.
I need to be able to asynchronously wait for different kind of message received by the socket. For this I'm able to create a std::list
that store std::pair
of a predicate (to determine if the received message is one we are waiting for) and an asio::any_completion_handler
completion handler that is triggered when the message is received and the corresponding predicate is true.
This implementation seems to work correctly if I'm using asio::use_awaitable
, but after this first step I would like to add a custom timeout token (ie: https://cppalliance.org/asio/2023/01/02/Asio201Timeouts.html last section).
I'm able to use this custom timeout token with asio async operation like async_read_until, but if want to use it for my previous custom async operation I don't figure how to bind correctly the cancellation slot...
Here is the code for the custom timeout token:
#include <asio.hpp>
struct timeout_provider;
// that's our completion token with the timeout attached
template<typename Token>
struct with_timeout {
timeout_provider * provider;
Token token;
};
// this is the timeout source
struct timeout_provider {
timeout_provider( asio::any_io_executor exec )
: timer{exec, std::chrono::steady_clock::time_point::max()}
{}
asio::steady_timer timer;
std::chrono::milliseconds timeout = std::chrono::milliseconds(10000);
asio::cancellation_slot cancellation_slot;
asio::cancellation_signal cancellation_signal;
asio::cancellation_type cancellation_type{asio::cancellation_type::terminal};
~timeout_provider() {
if (cancellation_slot.is_connected())
cancellation_slot.clear();
}
// to use it
template<typename Token>
auto operator()(Token && token)
{
return with_timeout<std::decay_t<Token>>{
this, std::forward<Token>(token)
};
}
// set up the timer and get ready to trigger
void arm()
{
timer.expires_after(timeout);
if (cancellation_slot.is_connected()) {
cancellation_slot.assign([this](asio::cancellation_type ct){
cancellation_signal.emit(ct);
});
}
timer.async_wait( [this](asio::error_code ec){
if (!ec) {
cancellation_signal.emit( cancellation_type );
}
});
}
};
template<typename Handler>
struct with_timeout_binder
{
timeout_provider * provider;
Handler handler;
template<typename ...Args>
void operator()(Args && ... args) {
//cancel the time, we're done!
provider->timer.cancel();
std::move(handler)(std::forward<Args>(args)...);
}
};
namespace asio {
// This is the class to specialize when implementing a completion token.
template<typename InnerToken, typename ... Signatures>
struct async_result<with_timeout<InnerToken>, Signatures...>
{
//using return_type = typename async_result<InnerToken, Signatures...>::return_type;
// this wrapper goes around the inner initiation, because we need to capture their cancellation slot
template<typename Initiation>
struct init_wrapper {
Initiation initiation;
timeout_provider * provider;
// the forwards to the initiation and lets us access the actual handler.
template <typename Handler, typename... Args>
void operator()( Handler && handler, Args && ... args) {
auto sl = asio::get_associated_cancellation_slot(handler);
if (sl.is_connected()) {
provider->cancellation_slot = sl;
}
provider->arm();
std::move(initiation)(
with_timeout_binder<std::decay_t<Handler>>{
provider,
std::forward<Handler>(handler)
}, std::forward<Args>(args)...);
}
};
// the actual initiation
template<typename Initiation, typename RawToken, typename ... Args>
static auto initiate(Initiation && init, RawToken && token, Args && ... args) {
return async_result<InnerToken, Signatures...>::initiate(
// here we wrap the initiation so we enable the above injection
init_wrapper<std::decay_t<Initiation>>(std::forward<Initiation>(init), token.provider),
std::move(token.token),
std::forward<Args>(args)...
);
}
};
// forward the other associators, such as allocator & executor
template <template <typename, typename> class Associator, typename T, typename DefaultCandidate>
struct associator<Associator, with_timeout_binder<T>, DefaultCandidate> {
typedef typename Associator<T, DefaultCandidate>::type type;
static type get(const with_timeout_binder<T>& b, const DefaultCandidate& c = DefaultCandidate()) noexcept {
return Associator<T, DefaultCandidate>::get(b.handler, c);
}
};
// set the slot explicitly
template <typename T, typename CancellationSlot1>
struct associated_cancellation_slot< with_timeout_binder<T>, CancellationSlot1 > {
typedef asio::cancellation_slot type;
static type get(const with_timeout_binder<T>& b, const CancellationSlot1& = CancellationSlot1()) noexcept {
return b.provider->cancellation_signal.slot();
}
};
}
And then the code for the client TCP socket:
class client : public std::enable_shared_from_this<client> {
public:
using predicate = std::function<bool( const std::string & )>;
using handler = asio::any_completion_handler<void( asio::error_code, std::shared_ptr<std::string> )>;
using predicate_handler = std::pair< predicate, handler >;
client( tcp::socket socket )
: m_socket( std::move(socket) )
{
asio::co_spawn( m_socket.get_executor(), [&]() -> asio::awaitable<void> {
std::string buffer;
for(;;) {
auto len = co_await asio::async_read_until( m_socket, asio::dynamic_buffer( buffer ), "\n", asio::use_awaitable );
auto msg = std::make_shared< std::string >( buffer.substr( 0, len ) );
for( auto it = m_predicate_handlers.begin() ; it != m_predicate_handlers.end() ; ) {
if( it->first( *msg ) ) {
asio::post( m_socket.get_executor(), [ msg, handler = std::move( it->second ) ]() mutable {
std::move( handler )( asio::error_code{}, msg );
});
it = m_predicate_handlers.erase( it );
}
else {
it++;
}
}
buffer.erase( 0, len );
}
}, asio::detached );
}
template< asio::completion_token_for<void(std::error_code, const std::shared_ptr<std::string>)> CompletionToken >
auto async_wait_until( predicate pred, CompletionToken &&token ) {
return asio::async_initiate< CompletionToken, void( std::error_code, const std::shared_ptr<std::string> ) >(
[&]( asio::completion_handler_for<void(std::error_code, const std::shared_ptr<std::string>)> auto&& handler, predicate pred ) mutable {
m_predicate_handlers.push_back( std::make_pair(
pred,
std::forward<decltype(handler)>(handler)
)
);
}, token, pred
);
}
private:
tcp::socket m_socket;
std::list< predicate_handler > m_predicate_handlers;
};
And finally the main app:
int main(int argc, char *argv[]) {
asio::io_context ctx;
asio::co_spawn( ctx, [&]() -> asio::awaitable<void> {
tcp::acceptor acceptor(ctx, asio::ip::tcp::endpoint( asio::ip::tcp::v4(), 8000 ) );
for(;;) {
auto socket = co_await acceptor.async_accept( asio::use_awaitable );
auto cl = std::make_shared< client >( std::move( socket ) );
asio::co_spawn( ctx, [cl]() -> asio::awaitable<void> {
for(;;){
auto msg = co_await cl->async_wait_until(
[]( const std::string &msg ) {
return msg.starts_with("REQUEST");
},
asio::use_awaitable
);
std::cout << "Request received: " << *msg << std::endl;
}
}, asio::detached);
asio::co_spawn( ctx, [&,cl]() -> asio::awaitable<void> {
timeout_provider timeout(ctx.get_executor());
for(;;){
auto [ec, msg ] = co_await cl->async_wait_until(
[]( const std::string &msg ) {
return msg.starts_with("NOTIFICATION");
},
timeout( asio::as_tuple( asio::use_awaitable ) )
);
if( !ec )
std::cout << "Notification received: " << *msg << std::endl;
else
std::cout << "Notification not received: " << ec.message() << std::endl;
}
}, asio::detached);
}
}, asio::detached );
ctx.run();
return 0;
}
Upvotes: 0
Views: 58
Reputation: 393759
After painstakingly completing the answer to the actual question, I set about implementing it the simplest way I would imagine doing it myself:
#include <boost/asio.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <iostream>
#include <map>
namespace asio = boost::asio;
using tcp = asio::ip::tcp;
using boost::system::error_code;
using namespace std::chrono_literals;
using namespace asio::experimental::awaitable_operators;
using task = asio::awaitable<void>;
class client {
public:
using predicate = std::function<bool(std::string const&)>;
using order_sig = void(error_code, std::string);
using handler = asio::any_completion_handler<order_sig>;
client(tcp::socket& socket) : m_socket(socket) {}
template <asio::completion_token_for<order_sig> CompletionToken = asio::deferred_t>
auto async_order(predicate pred, CompletionToken&& token = {}) {
return asio::async_initiate<CompletionToken, order_sig>(
[&](auto handler, predicate pred) mutable {
auto s = get_associated_cancellation_slot(handler);
auto id = m_orders.insert({orderid++, Order(pred, std::move(handler))}).first->first;
if (s.is_connected())
s.assign([this, id](asio::cancellation_type) { cancel(id); });
},
token, pred);
}
void cancel(uint id) {
if (auto it = m_orders.find(id); it != m_orders.end()) {
auto h = std::move(it->second.handler_);
m_orders.erase(it);
if (h)
std::move(h)(asio::error::operation_aborted, "");
}
}
task read_loop() {
auto ex = co_await asio::this_coro::executor;
try {
for (std::string buffer;
auto len = co_await async_read_until(m_socket, asio::dynamic_buffer(buffer), "\n"); //
) {
std::string const msg(buffer.substr(0, len - 1)); // remove the newline
buffer.erase(0, len);
std::erase_if(m_orders, [&](auto& kvp) {
if (auto& [id, order] = kvp; bool matched = order.pred_(msg)) {
post(ex, std::bind(std::move(order.handler_), error_code{}, msg));
return true;
} else return false;
});
}
} catch (boost::system::system_error const& se) {
std::cerr << "Read loop error: " << se.code().message() << std::endl;
}
for (auto& [id, order] : m_orders)
post(ex, std::bind(std::move(order.handler_), asio::error::operation_aborted, ""));
}
private:
tcp::socket& m_socket;
std::atomic_uint orderid = 0;
struct Order {
predicate pred_;
handler handler_;
};
using Pending = std::map<uint, Order>;
Pending m_orders;
};
task timer(auto d) {
co_await asio::steady_timer(co_await asio::this_coro::executor, d).async_wait(asio::as_tuple);
}
static bool is_request(std::string_view v) { return v.starts_with("REQUEST"); }
static bool is_notification(std::string_view v) { return v.starts_with("NOTIFICATION"); }
task request_loop(client& cl) {
for (;;)
std::cout << "Request received: " << co_await cl.async_order(is_request) << std::endl;
}
task notification_loop(client& cl) {
for (;;) {
auto r = co_await (cl.async_order(is_notification, asio::use_awaitable) || timer(10s));
if (r.index() == 0)
std::cout << "Notification received: " << std::get<0>(r) << std::endl;
else
std::cout << "Notification not received - timeout" << std::endl;
}
}
task session(tcp::socket s) {
auto m_ep = s.remote_endpoint();
std::cerr << "session started (" << m_ep << ")" << std::endl;
client cl(s);
co_await (cl.read_loop() || (request_loop(cl) && notification_loop(cl)));
std::cerr << "session ended (" << m_ep << ")" << std::endl;
}
task server(uint16_t port) {
auto ex = co_await asio::this_coro::executor;
for (tcp::acceptor acceptor(ex, {{}, port});;)
co_spawn(ex, session(co_await acceptor.async_accept(asio::use_awaitable)), asio::detached);
}
int main() {
asio::io_context ctx;
co_spawn(ctx, server(8000), asio::detached);
ctx.run();
}
With a live demo:
I suspect things can be further simplified using channels, but I'll leave that as an exorcism for the reader.
Upvotes: 1
Reputation: 393759
"I'm able to use this custom timeout token with asio async operation like async_read_until
" - I'm not seeing it in your code though?
First I patched up the code to be self-contained.
Note a few changes relating to Boost Asio vs. standalone. You should be able to change
namespace asio = boost::asio; using boost::system::error_code;
Into just:
using asio::error_code;
Other than that I have a few sanity fixups (like less repetition of the completion signature and being more hygienic with the shared message
shared_ptr<string const>
).
#include <boost/asio.hpp>
namespace asio = boost::asio;
using boost::system::error_code;
namespace Extension {
struct timeout_provider;
// that's our completion token with the timeout attached
template <typename Token> struct with_timeout {
timeout_provider* provider;
Token token;
};
// this is the timeout source
struct timeout_provider {
timeout_provider(asio::any_io_executor exec)
: timer{exec, std::chrono::steady_clock::time_point::max()} {}
asio::steady_timer timer;
std::chrono::milliseconds tt_total{10000};
asio::cancellation_slot cancellation_slot;
asio::cancellation_signal cancellation_signal;
asio::cancellation_type cancellation_type{asio::cancellation_type::terminal};
~timeout_provider() {
if (cancellation_slot.is_connected())
cancellation_slot.clear();
}
// to use it
template <typename Token> auto operator()(Token&& token) {
return with_timeout<std::decay_t<Token>>{this, std::forward<Token>(token)};
}
// set up the timer and get ready to trigger
void arm() {
timer.expires_after(tt_total);
if (cancellation_slot.is_connected()) {
cancellation_slot.assign(
[this](asio::cancellation_type ct) { cancellation_signal.emit(ct); });
}
timer.async_wait([this](error_code ec) {
if (!ec) {
// printf("Firing terminal\n");
cancellation_signal.emit(cancellation_type = asio::cancellation_type::terminal);
}
});
}
};
// the completion handler
// that's our completion token with the timeout attached
template <typename Handler> struct with_timeout_binder {
timeout_provider* provider;
Handler handler;
template <typename... Args> void operator()(Args&&... args) {
// cancel the time, we're done!
provider->timer.cancel();
std::move(handler)(std::forward<Args>(args)...);
}
};
} // namespace Extension
namespace boost::asio {
// This is the class to specialize when implementing a completion token.
template <typename InnerToken, typename... Signatures>
struct async_result<Extension::with_timeout<InnerToken>, Signatures...> {
using return_type = typename async_result<InnerToken, Signatures...>::return_type;
// this wrapper goes around the inner initiation, because we need to capture their cancellation slot
template <typename Initiation> struct init_wrapper {
Initiation initiation;
Extension::timeout_provider* provider;
// the forwards to the initiation and lets us access the actual handler.
template <typename Handler, typename... Args> void operator()(Handler&& handler, Args&&... args) {
auto sl = asio::get_associated_cancellation_slot(handler);
if (sl.is_connected()) {
provider->cancellation_slot = sl;
}
provider->arm();
std::move(initiation)(
Extension::with_timeout_binder<std::decay_t<Handler>>{provider,
std::forward<Handler>(handler)},
std::forward<Args>(args)...);
}
};
// the actual initiation
template <typename Initiation, typename RawToken, typename... Args>
static auto initiate(Initiation&& init, RawToken&& token, Args&&... args) -> return_type {
return async_result<InnerToken, Signatures...>::initiate(
// here we wrap the initiation so we enable the above injection
init_wrapper<std::decay_t<Initiation>>(std::forward<Initiation>(init), token.provider),
std::move(token.token), std::forward<Args>(args)...);
}
};
// forward the other associators, such as allocator & executor
template <template <typename, typename> class Associator, typename T, typename DefaultCandidate>
struct associator<Associator, Extension::with_timeout_binder<T>, DefaultCandidate> {
typedef typename Associator<T, DefaultCandidate>::type type;
static type get(Extension::with_timeout_binder<T> const& b,
DefaultCandidate const& c = DefaultCandidate()) noexcept {
return Associator<T, DefaultCandidate>::get(b.handler, c);
}
};
// set the slot explicitly
template <typename T, typename CancellationSlot1>
struct associated_cancellation_slot<Extension::with_timeout_binder<T>, CancellationSlot1> {
typedef asio::cancellation_slot type;
static type get(Extension::with_timeout_binder<T> const& b,
CancellationSlot1 const& = CancellationSlot1()) noexcept {
return b.provider->cancellation_signal.slot();
}
};
} // namespace boost::asio
#include <iostream>
#include <list>
using asio::ip::tcp;
using Extension::timeout_provider;
class client : public std::enable_shared_from_this<client> {
public:
using predicate = std::function<bool(std::string const&)>;
using handler = asio::any_completion_handler<void(error_code, std::shared_ptr<std::string const>)>;
using predicate_handler = std::pair<predicate, handler>;
client(tcp::socket socket) : m_socket(std::move(socket)) {
asio::co_spawn(
m_socket.get_executor(),
[&]() -> asio::awaitable<void> {
std::string buffer;
for (;;) {
auto len = co_await asio::async_read_until(m_socket, asio::dynamic_buffer(buffer), "\n",
asio::use_awaitable);
if (len) {
auto msg = std::make_shared<std::string const>(
buffer.substr(0, len - 1)); // remove the newline
buffer.erase(0, len);
for (auto it = m_predicate_handlers.begin(); it != m_predicate_handlers.end();) {
if (it->first(*msg)) {
asio::post(m_socket.get_executor(),
[msg, handler = std::move(it->second)]() mutable {
std::move(handler)(error_code{}, msg);
});
it = m_predicate_handlers.erase(it);
} else {
it++;
}
}
}
}
},
asio::detached);
}
template <typename Sig = void(error_code, std::shared_ptr<std::string const>),
asio::completion_token_for<Sig> CompletionToken>
auto async_wait_until(predicate pred, CompletionToken&& token) {
return asio::async_initiate<CompletionToken, Sig>(
[&](asio::completion_handler_for<Sig> auto&& handler, predicate pred) mutable {
m_predicate_handlers.push_back(
std::make_pair(pred, std::forward<decltype(handler)>(handler)));
},
token, pred);
}
private:
tcp::socket m_socket;
std::list<predicate_handler> m_predicate_handlers;
};
int main() {
asio::io_context ctx;
asio::co_spawn(
ctx,
[&]() -> asio::awaitable<void> {
tcp::acceptor acceptor(ctx, {{}, 8000});
for (;;) {
std::cout << "Waiting for connection" << std::endl;
auto socket = co_await acceptor.async_accept(asio::use_awaitable);
std::cout << "Connection accepted: " << socket.remote_endpoint() << std::endl;
auto cl = std::make_shared<client>(std::move(socket));
asio::co_spawn(
ctx,
[cl]() -> asio::awaitable<void> {
for (;;) {
auto msg = co_await cl->async_wait_until(
[](std::string const& msg) { return msg.starts_with("REQUEST"); },
asio::use_awaitable);
std::cout << "Request received: " << *msg << std::endl;
}
},
asio::detached);
asio::co_spawn(
ctx,
[&, cl]() -> asio::awaitable<void> {
timeout_provider timeout(ctx.get_executor());
for (;;) {
auto [ec, msg] = co_await cl->async_wait_until(
[](std::string const& msg) { return msg.starts_with("NOTIFICATION"); },
timeout(asio::as_tuple(asio::use_awaitable)));
if (!ec)
std::cout << "Notification received: " << *msg << std::endl;
else
std::cout << "Notification not received: " << ec.message() << std::endl;
}
},
asio::detached);
}
},
asio::detached);
ctx.run();
}
Now, on to the real problem: you are storing a handler to "manually" complete later. This means that cancellation is also something you have to "manually" complete, and you don't.
First, let's refactor m_predicate_handlers
to m_orders
:
struct order {
predicate pred_;
handler handler_;
};
std::list<order> m_orders;
Now, let's extend using more cancellation slots:
struct order {
predicate pred_;
handler handler_;
asio::cancellation_slot slot_;
};
Now, hook them up when we store the order:
template <asio::completion_token_for<order_sig> CompletionToken>
auto async_order(predicate pred, CompletionToken&& token) {
return asio::async_initiate<CompletionToken, order_sig>(
[&](auto handler, predicate pred) mutable {
auto id = orderid++;
std::cerr << "initiating order #" << id << std::endl;
auto slot = asio::get_associated_cancellation_slot(handler); // before we move it!
auto& filed = m_orders.emplace_back(pred, std::move(handler));
if (slot.is_connected()) {
filed.slot_ = std::move(slot);
filed.slot_.assign([id, &orders = m_orders, &filed](asio::cancellation_type) {
std::cerr << "cancelling order #" << id << std::endl;
auto h = std::move(filed.handler_);
orders.remove_if([&](auto& order) { return &order == &filed; });
std::move(h)(asio::error::operation_aborted, nullptr);
});
}
},
token, pred);
}
That's works for me. There's some remaining issues with
timeout_provider
destructor not emitting the cancellation signal. Perhaps timeout_provider
could be a member of client
so it's lifetime would be controlled in a "sensible" locationread_loop
needs to cancel all pending orders when exitingclient
read loop not owning the shared pointer to itself. This is tied to it starting from the constructor (which is incompatible with shared_from_this
conceptually).Fixing 1. and 2. makes things considerably more complex:
template <asio::completion_token_for<order_sig> CompletionToken>
auto async_order(predicate pred, CompletionToken&& token) {
return asio::async_initiate<CompletionToken, order_sig>(
[&](auto handler, predicate pred) mutable {
auto id = orderid++;
std::cerr << "initiating order #" << id << std::endl;
auto slot = asio::get_associated_cancellation_slot(handler); // before we move it!
auto& filed = m_orders
.insert({id, order(pred, std::move(handler))}) //
.first->second;
if (slot.is_connected()) {
filed.slot_ = std::move(slot);
std::shared_ptr<Pending> orders(shared_from_this(), &m_orders); // alias
filed.slot_.assign(
[id, weak = std::weak_ptr(orders), ex = m_socket.get_executor()] //
(asio::cancellation_type) {
if (auto orders = weak.lock()) {
if (auto it = orders->find(id); it != orders->end()) {
std::cerr << "cancelling order #" << id << std::endl;
auto [_, h, slot] = std::move(it->second);
slot.clear();
post(ex, std::bind(std::move(h), asio::error::operation_aborted, ""));
orders->erase(it);
} else {
std::cerr << "cancelling order #" << id << " (disappeared)" << std::endl;
}
} else {
std::cerr << "cancelling order #" << id << " (client disappeared)"
<< std::endl;
}
});
}
},
token, pred);
}
duration
s instead of hardcoded millisecond
type,list
with another node-based container (but associative to avoid relying on address comparisons which are not reliable due to ABA problem))Fixing those except 3.:
#include <boost/asio.hpp>
#include <iostream>
#include <map>
namespace asio = boost::asio;
using boost::system::error_code;
using namespace std::chrono_literals;
namespace Extension {
struct timeout_provider;
// that's our completion token with the timeout attached
template <typename Token> struct with_timeout {
timeout_provider* provider;
Token token;
};
// this is the timeout source
struct timeout_provider {
int const id_ = [] {
static std::atomic_int idgen = 0;
return idgen++;
}();
using duration = std::chrono::steady_clock::duration;
timeout_provider(asio::any_io_executor exec, duration d = 10s) : timer{exec}, tt_total(d) {}
asio::steady_timer timer;
duration tt_total;
asio::cancellation_slot slot_;
asio::cancellation_signal sig_;
~timeout_provider() {
if (slot_.is_connected())
slot_.clear();
sig_.emit(asio::cancellation_type::terminal);
}
// to use it
template <typename Token> auto operator()(Token&& token) {
return with_timeout<std::decay_t<Token>>{this, std::forward<Token>(token)};
}
// set up the timer and get ready to trigger
void arm() {
timer.expires_after(tt_total);
if (slot_.is_connected()) {
slot_.assign([this](asio::cancellation_type ct) { sig_.emit(ct); });
}
timer.async_wait([this](error_code ec) {
if (!ec) {
sig_.emit(asio::cancellation_type::terminal);
}
});
}
};
// the completion handler
// that's our completion token with the timeout attached
template <typename Handler> struct with_timeout_binder {
timeout_provider* provider;
Handler handler;
template <typename... Args> void operator()(Args&&... args) {
// cancel the time, we're done!
provider->timer.cancel();
std::move(handler)(std::forward<Args>(args)...);
}
};
} // namespace Extension
namespace boost::asio {
// This is the class to specialize when implementing a completion token.
template <typename InnerToken, typename... Signatures>
struct async_result<Extension::with_timeout<InnerToken>, Signatures...> {
using return_type = typename async_result<InnerToken, Signatures...>::return_type;
// this wrapper goes around the inner initiation, because we need to capture their cancellation slot
template <typename Initiation> struct init_wrapper {
Initiation initiation;
Extension::timeout_provider* provider;
// the forwards to the initiation and lets us access the actual handler.
template <typename Handler, typename... Args> void operator()(Handler&& handler, Args&&... args) {
auto sl = asio::get_associated_cancellation_slot(handler);
if (sl.is_connected()) {
provider->slot_ = sl;
}
provider->arm();
std::move(initiation)(
Extension::with_timeout_binder<std::decay_t<Handler>>{provider,
std::forward<Handler>(handler)},
std::forward<Args>(args)...);
}
};
// the actual initiation
template <typename Initiation, typename RawToken, typename... Args>
static auto initiate(Initiation&& init, RawToken&& token, Args&&... args) -> return_type {
return async_result<InnerToken, Signatures...>::initiate(
// here we wrap the initiation so we enable the above injection
init_wrapper<std::decay_t<Initiation>>(std::forward<Initiation>(init), token.provider),
std::move(token.token), std::forward<Args>(args)...);
}
};
// forward the other associators, such as allocator & executor
template <template <typename, typename> class Associator, typename T, typename DefaultCandidate>
struct associator<Associator, Extension::with_timeout_binder<T>, DefaultCandidate> {
typedef typename Associator<T, DefaultCandidate>::type type;
static type get(Extension::with_timeout_binder<T> const& b,
DefaultCandidate const& c = DefaultCandidate()) noexcept {
return Associator<T, DefaultCandidate>::get(b.handler, c);
}
};
// set the slot explicitly
template <typename T, typename CancellationSlot1>
struct associated_cancellation_slot<Extension::with_timeout_binder<T>, CancellationSlot1> {
typedef asio::cancellation_slot type;
static type get(Extension::with_timeout_binder<T> const& b,
CancellationSlot1 const& = CancellationSlot1()) noexcept {
return b.provider->sig_.slot();
}
};
} // namespace boost::asio
using asio::ip::tcp;
using Extension::timeout_provider;
class client : public std::enable_shared_from_this<client> {
public:
using predicate = std::function<bool(std::string const&)>;
using order_sig = void(error_code, std::string);
using handler = asio::any_completion_handler<order_sig>;
client(tcp::socket socket) : m_socket(std::move(socket)) {
std::cerr << "session started (" << m_ep << ")" << std::endl;
asio::co_spawn(m_socket.get_executor(), read_loop(), asio::detached);
}
~client() { std::cerr << "session ended (" << m_ep << ")" << std::endl; }
template <asio::completion_token_for<order_sig> CompletionToken>
auto async_order(predicate pred, CompletionToken&& token) {
return asio::async_initiate<CompletionToken, order_sig>(
[&](auto handler, predicate pred) mutable {
auto id = orderid++;
std::cerr << "initiating order #" << id << std::endl;
auto slot = asio::get_associated_cancellation_slot(handler); // before we move it!
auto& filed = m_orders
.insert({id, order(pred, std::move(handler))}) //
.first->second;
if (slot.is_connected()) {
filed.slot_ = std::move(slot);
std::shared_ptr<Pending> orders(shared_from_this(), &m_orders); // alias
filed.slot_.assign(
[id, weak = std::weak_ptr(orders), ex = m_socket.get_executor()] //
(asio::cancellation_type) {
if (auto orders = weak.lock()) {
if (auto it = orders->find(id); it != orders->end()) {
std::cerr << "cancelling order #" << id << std::endl;
auto [_, h, slot] = std::move(it->second);
slot.clear();
post(ex, std::bind(std::move(h), asio::error::operation_aborted, ""));
orders->erase(it);
} else {
std::cerr << "cancelling order #" << id << " (disappeared)" << std::endl;
}
} else {
std::cerr << "cancelling order #" << id << " (client disappeared)"
<< std::endl;
}
});
}
},
token, pred);
}
private:
tcp::socket m_socket;
tcp::endpoint m_ep = m_socket.remote_endpoint();
asio::awaitable<void> read_loop() {
auto ex = co_await asio::this_coro::executor;
try {
for (std::string buffer;
auto len = co_await async_read_until(m_socket, asio::dynamic_buffer(buffer), "\n"); //
) {
std::string const msg(buffer.substr(0, len - 1)); // remove the newline
buffer.erase(0, len);
std::erase_if(m_orders, [&](auto& kvp) {
auto& [id, order] = kvp;
bool matched = order.pred_(msg);
if (matched) {
order.slot_.clear();
post(ex, std::bind(std::move(order.handler_), error_code{}, msg));
}
return matched;
});
}
} catch (boost::system::system_error const& se) {
std::cerr << "Read loop error: " << se.code().message() << std::endl;
}
for (auto& [id, order] : m_orders) {
order.slot_.clear();
post(ex, std::bind(std::move(order.handler_), asio::error::operation_aborted, ""));
}
}
std::atomic_uint orderid = 0;
struct order {
predicate pred_;
handler handler_;
asio::cancellation_slot slot_{};
};
using Pending = std::map<uint, order>;
Pending m_orders;
};
asio::awaitable<void> session(std::shared_ptr<client> cl) {
auto ex = co_await asio::this_coro::executor;
co_spawn(
ex,
[cl]() -> asio::awaitable<void> {
try {
for (;;) {
auto msg = co_await cl->async_order(
[](std::string_view v) { return v.starts_with("REQUEST"); }, //
asio::use_awaitable);
std::cout << "Request received: " << msg << std::endl;
}
} catch (std::exception const& e) {
std::cerr << "Request receive error: " << e.what() << std::endl;
}
std::cerr << "Request loop exit" << std::endl;
},
asio::detached);
asio::co_spawn(
ex,
[cl]() -> asio::awaitable<void> {
for (timeout_provider timeout(co_await asio::this_coro::executor);;) {
auto [ec, msg] =
co_await cl->async_order([](std::string_view v) { return v.starts_with("NOTIFICATION"); },
timeout(asio::as_tuple(asio::use_awaitable)));
if (!ec)
std::cout << "Notification received: " << msg << std::endl;
else {
std::cout << "Notification not received: " << ec.message() << std::endl;
if (ec == asio::error::operation_aborted) {
break;
}
}
}
std::cerr << "Notification loop exit" << std::endl;
},
asio::detached);
}
asio::awaitable<void> server(uint16_t port) {
auto ex = co_await asio::this_coro::executor;
tcp::acceptor acceptor(ex, {{}, port});
for (;;) {
std::cout << "Waiting for connection" << std::endl;
auto socket = co_await acceptor.async_accept(asio::use_awaitable);
std::cout << "Connection accepted: " << socket.remote_endpoint() << std::endl;
co_spawn(ex, session(std::make_shared<client>(std::move(socket))), asio::detached);
}
}
int main() {
asio::io_context ctx;
co_spawn(ctx, server(8000), asio::detached);
ctx.run();
}
I spent inordinate amounts of time (I really don't wish to tell you how many hours I spent debugging all the edge cases) and still I didn't address point 3. above.
I'd suggest GREATLY simplifying. As you may notice, your manual ("separate") completion duplicates the entirety of cancellation signal forwarding from the token adaptor. That tells me that it would be far simpler AND far more flexible to instead just add a duration
argument to the async_order
initiation function.
Besides, to fix both all the lifetimes issues and timeouts in one go, consider Parallel Groups. It will be orders of magnitude simpler and shorter, I suspect.
Upvotes: 0