Reputation: 71
Good afternoon. I am using this code exactly and would like to modify it to suit my needs. https://www.boost.org/doc/libs/1_77_0/doc/html/boost_asio/example/cpp11/echo/async_tcp_echo_server.cpp
In the function do_write, I want to raise another server, which will be on an arbitrary port, which at a certain moment could turn itself off and send a signal about this to the connection within which it was created (the connection is active all this time, of course). I cannot add a server to the io_context that is already started at the moment. is there any other possibility? I tried to create another instance of io_context and create the server on a different thread, but that also didn't work for me.
Upvotes: 1
Views: 63
Reputation: 392931
I started from that linked example and made the session
read linewise commands:
LISTEN <port>
to start a new listener on that portSTOP <port>
to stop a listenerEXIT
to close the connectionListeners are kept in a global map by port number:
using Port = uint16_t;
using Server = std::weak_ptr<class server>;
std::map<Port, Server> g_listeners;
Servers take an optional Callback
:
using Callback = std::function<void()>;
That is used to report the exit of a listener to the connection that originally started it, if it is still active.
Reading the linewise commands is pretty straightforward:
void do_read()
{
async_read_until(
socket_, data_, "\n",
[this, self = shared_from_this()](error_code ec, size_t length) {
if (ec) {
std::cerr << "do_read: " << ec.message() << std::endl;
} else {
std::string line;
getline(std::istream(&data_), line);
if (handle(line))
do_read();
}
});
}
handle
parses them - I kept it as simple as I could:
bool handle(std::string const& command)
{
try {
std::istringstream parser(command);
parser.exceptions(std::ios::failbit | std::ios::badbit);
std::string cmd;
Port port;
if (parser >> cmd && (cmd == "LISTEN") && parser >> port) {
start_listener(port);
return true;
}
parser.clear();
parser.str(command);
if (parser >> cmd && (cmd == "STOP") && parser >> port) {
stop_listener(port);
return true;
}
parser.clear();
parser.str(command);
if (parser >> cmd && (cmd == "EXIT")) {
message("Goodbye");
return false;
}
message("Invalid command"s);
} catch (std::exception const& e) {
message("Invalid argument"s);
}
return true;
}
Now the commands start_listener
and stop_listener
insert or erase server
instances from the g_listeners
container:
void session::start_listener(Port port)
{
auto [it, inserted] = g_listeners.try_emplace(port);
if (!inserted) {
message("Already listening on port " + std::to_string(port));
} else {
auto on_close = [handle = weak_from_this(), port] {
if (auto self = handle.lock())
self->message("The listener for port " + std::to_string(port) + " has closed");
};
auto s = std::make_shared<server>( //
socket_.get_executor(), port, on_close);
it->second = s;
s->start();
message("Started listening on port " + std::to_string(port));
}
}
void session::stop_listener(Port port)
{
auto it = g_listeners.find(port);
if (it != g_listeners.end()) {
if (auto server = it->second.lock())
{
message("Stopping listener on port " + std::to_string(port));
server->stop();
} else {
// when two connections simultaneously STOP the same listener?
message("Listener on port " + std::to_string(port) + " already stopped");
}
g_listeners.erase(it);
} else {
message("No listener on port " + std::to_string(port));
}
}
Note that weak pointers are used on both ends to prevent operating on already-destructed instances e.g.
STOP
from multiple connections at the exact same timeFor the response messages I use the outbox_
pattern so the buffer lifetimes are guaranteed even if more than one message is queued.
For the server, I changed the io_context&
argument to be an executor
; the result is equivalent but it's much simpler to get an executor from any IO object than to pass a reference to the io_context&
around. Of course, you can "cheat" by making io_context
a global variable.
#include <boost/asio.hpp>
#include <deque>
#include <iostream>
#include <iomanip>
using boost::asio::ip::tcp;
using boost::system::error_code;
using executor_type = boost::asio::any_io_executor;
using namespace std::literals;
using Port = uint16_t;
using Callback = std::function<void()>;
using Server = std::weak_ptr<class server>;
std::map<Port, Server> g_listeners;
class session : public std::enable_shared_from_this<session> {
public:
session(tcp::socket socket) : socket_(std::move(socket)) {}
void start()
{
do_read();
message("Welcome");
}
private:
void do_read()
{
async_read_until(
socket_, data_, "\n",
[this, self = shared_from_this()](error_code ec, size_t length) {
if (ec) {
std::cerr << "do_read: " << ec.message() << std::endl;
} else {
std::string line;
getline(std::istream(&data_), line);
if (handle(line))
do_read();
}
});
}
bool handle(std::string const& command)
{
try {
std::istringstream parser(command);
parser.exceptions(std::ios::failbit | std::ios::badbit);
std::string cmd;
Port port;
if (parser >> cmd && (cmd == "LISTEN") && parser >> port) {
start_listener(port);
return true;
}
parser.clear();
parser.str(command);
if (parser >> cmd && (cmd == "STOP") && parser >> port) {
stop_listener(port);
return true;
}
parser.clear();
parser.str(command);
if (parser >> cmd && (cmd == "EXIT")) {
message("Goodbye");
return false;
}
message("Invalid command"s);
} catch (std::exception const& e) {
message("Invalid argument"s);
}
return true;
}
void message(std::string msg) {
outbox_.push_back(std::move(msg) + "\n");
if (outbox_.size() == 1)
do_write();
}
void do_write() {
async_write( //
socket_, boost::asio::buffer(outbox_.front()),
[this, self = shared_from_this()](error_code ec, size_t) {
if (ec) {
std::cerr << "do_write: " << ec.message() << std::endl;
}
outbox_.pop_front();
if (!outbox_.empty())
do_write();
});
}
void start_listener(Port port);
void stop_listener(Port port);
tcp::socket socket_;
boost::asio::streambuf data_{32*1024}; // max size
std::deque<std::string> outbox_;
};
class server: public std::enable_shared_from_this<server> {
public:
server(
executor_type exe, short port, Callback callback = [] {})
: acceptor_(exe, tcp::endpoint(tcp::v4(), port))
, on_close_(callback)
{
}
void start() { do_accept(); }
void stop() { acceptor_.close(); }
private:
void do_accept()
{
acceptor_.async_accept(
[this, self = shared_from_this()] //
(error_code ec, tcp::socket socket) {
if (!ec) {
std::make_shared<session>(std::move(socket))->start();
do_accept();
} else {
if (on_close_)
on_close_();
}
});
}
tcp::acceptor acceptor_;
Callback on_close_;
};
void session::start_listener(Port port)
{
auto [it, inserted] = g_listeners.try_emplace(port);
if (!inserted) {
message("Already listening on port " + std::to_string(port));
} else {
auto on_close = [handle = weak_from_this(), port] {
if (auto self = handle.lock())
self->message("The listener for port " + std::to_string(port) + " has closed");
};
auto s = std::make_shared<server>( //
socket_.get_executor(), port, on_close);
it->second = s;
s->start();
message("Started listening on port " + std::to_string(port));
}
}
void session::stop_listener(Port port)
{
auto it = g_listeners.find(port);
if (it != g_listeners.end()) {
if (auto server = it->second.lock())
{
message("Stopping listener on port " + std::to_string(port));
server->stop();
} else {
// when two connections simultaneously STOP the same listener?
message("Listener on port " + std::to_string(port) + " already stopped");
}
g_listeners.erase(it);
} else {
message("No listener on port " + std::to_string(port));
}
}
int main(int argc, char* argv[])
{
try {
if (argc != 2) {
std::cerr << "Usage: async_tcp_echo_server <port>\n";
return 1;
}
boost::asio::io_context io_context;
{
Port port = std::atoi(argv[1]);
auto s = std::make_shared<server>(io_context.get_executor(), port);
s->start();
g_listeners.emplace(port, s);
}
io_context.run();
} catch (std::exception& e) {
std::cerr << "Exception: " << e.what() << "\n";
}
}
Which demo's interactively:
Upvotes: 1