Vlad Sahar
Vlad Sahar

Reputation: 71

How to run second server from first?

Good afternoon. I am using this code exactly and would like to modify it to suit my needs. https://www.boost.org/doc/libs/1_77_0/doc/html/boost_asio/example/cpp11/echo/async_tcp_echo_server.cpp

In the function do_write, I want to raise another server, which will be on an arbitrary port, which at a certain moment could turn itself off and send a signal about this to the connection within which it was created (the connection is active all this time, of course). I cannot add a server to the io_context that is already started at the moment. is there any other possibility? I tried to create another instance of io_context and create the server on a different thread, but that also didn't work for me.

Upvotes: 1

Views: 63

Answers (1)

sehe
sehe

Reputation: 392931

I started from that linked example and made the session read linewise commands:

  • LISTEN <port> to start a new listener on that port
  • STOP <port> to stop a listener
  • EXIT to close the connection

Listeners are kept in a global map by port number:

using Port     = uint16_t;
using Server   = std::weak_ptr<class server>;
std::map<Port, Server> g_listeners;

Servers take an optional Callback:

using Callback = std::function<void()>;

That is used to report the exit of a listener to the connection that originally started it, if it is still active.

Commands

Reading the linewise commands is pretty straightforward:

void do_read()
{
    async_read_until(
        socket_, data_, "\n",
        [this, self = shared_from_this()](error_code ec, size_t length) {
            if (ec) {
                std::cerr << "do_read: " << ec.message() << std::endl;
            } else {
                std::string line;
                getline(std::istream(&data_), line);
                if (handle(line))
                    do_read();
            }
        });
}

handle parses them - I kept it as simple as I could:

bool handle(std::string const& command)
{
    try {
        std::istringstream parser(command);
        parser.exceptions(std::ios::failbit | std::ios::badbit);

        std::string cmd;
        Port        port;

        if (parser >> cmd && (cmd == "LISTEN") && parser >> port) {
            start_listener(port);
            return true;
        }

        parser.clear();
        parser.str(command);
        if (parser >> cmd && (cmd == "STOP") && parser >> port) {
            stop_listener(port);
            return true;
        }

        parser.clear();
        parser.str(command);
        if (parser >> cmd && (cmd == "EXIT")) {
            message("Goodbye");
            return false;
        }
        message("Invalid command"s);
    } catch (std::exception const& e) {
        message("Invalid argument"s);
    }
    return true;
}

Now the commands start_listener and stop_listener insert or erase server instances from the g_listeners container:

void session::start_listener(Port port)
{
    auto [it, inserted] = g_listeners.try_emplace(port);

    if (!inserted) {
        message("Already listening on port " + std::to_string(port));
    } else {
        auto on_close = [handle = weak_from_this(), port] {
            if (auto self = handle.lock())
                self->message("The listener for port " + std::to_string(port) + " has closed");
        };
        auto s = std::make_shared<server>( //
            socket_.get_executor(), port, on_close);
        it->second = s;
        s->start();
        message("Started listening on port " + std::to_string(port));
    }
}

void session::stop_listener(Port port)
{
    auto it = g_listeners.find(port);
    if (it != g_listeners.end()) {
        if (auto server = it->second.lock())
        {
            message("Stopping listener on port " + std::to_string(port));
            server->stop();
        } else {
            // when two connections simultaneously STOP the same listener?
            message("Listener on port " + std::to_string(port) + " already stopped");
        }
        g_listeners.erase(it);
    } else {
        message("No listener on port " + std::to_string(port));
    }
}

Note that weak pointers are used on both ends to prevent operating on already-destructed instances e.g.

  • when the connection that started a listener has closed before the callback can be executed
  • when a listener is being commanded to STOP from multiple connections at the exact same time

Other Notes

For the response messages I use the outbox_ pattern so the buffer lifetimes are guaranteed even if more than one message is queued.

For the server, I changed the io_context& argument to be an executor; the result is equivalent but it's much simpler to get an executor from any IO object than to pass a reference to the io_context& around. Of course, you can "cheat" by making io_context a global variable.

Demo

Live On Coliru

#include <boost/asio.hpp>
#include <deque>
#include <iostream>
#include <iomanip>

using boost::asio::ip::tcp;
using boost::system::error_code;
using executor_type = boost::asio::any_io_executor;
using namespace std::literals;

using Port     = uint16_t;
using Callback = std::function<void()>;
using Server   = std::weak_ptr<class server>;
std::map<Port, Server> g_listeners;

class session : public std::enable_shared_from_this<session> {
  public:
    session(tcp::socket socket) : socket_(std::move(socket)) {}

    void start()
    {
        do_read();
        message("Welcome");
    }

  private:
    void do_read()
    {
        async_read_until(
            socket_, data_, "\n",
            [this, self = shared_from_this()](error_code ec, size_t length) {
                if (ec) {
                    std::cerr << "do_read: " << ec.message() << std::endl;
                } else {
                    std::string line;
                    getline(std::istream(&data_), line);
                    if (handle(line))
                        do_read();
                }
            });
    }

    bool handle(std::string const& command)
    {
        try {
            std::istringstream parser(command);
            parser.exceptions(std::ios::failbit | std::ios::badbit);

            std::string cmd;
            Port        port;

            if (parser >> cmd && (cmd == "LISTEN") && parser >> port) {
                start_listener(port);
                return true;
            }

            parser.clear();
            parser.str(command);
            if (parser >> cmd && (cmd == "STOP") && parser >> port) {
                stop_listener(port);
                return true;
            }

            parser.clear();
            parser.str(command);
            if (parser >> cmd && (cmd == "EXIT")) {
                message("Goodbye");
                return false;
            }
            message("Invalid command"s);
        } catch (std::exception const& e) {
            message("Invalid argument"s);
        }
        return true;
    }

    void message(std::string msg) {
        outbox_.push_back(std::move(msg) + "\n");
        if (outbox_.size() == 1)
            do_write();
    }

    void do_write() {
        async_write( //
            socket_, boost::asio::buffer(outbox_.front()),
            [this, self = shared_from_this()](error_code ec, size_t) {
                if (ec) {
                    std::cerr << "do_write: " << ec.message() << std::endl;
                }
                outbox_.pop_front();
                if (!outbox_.empty())
                    do_write();
            });
    }

    void start_listener(Port port);
    void stop_listener(Port port);

    tcp::socket socket_;
    boost::asio::streambuf data_{32*1024}; // max size
    std::deque<std::string> outbox_;
};

class server: public std::enable_shared_from_this<server> {
  public:
    server(
        executor_type exe, short port, Callback callback = [] {})
        : acceptor_(exe, tcp::endpoint(tcp::v4(), port))
        , on_close_(callback)
    {
    }

    void start() { do_accept(); }
    void stop() { acceptor_.close(); }

  private:
    void do_accept()
    {
        acceptor_.async_accept(
            [this, self = shared_from_this()] //
            (error_code ec, tcp::socket socket) {
                if (!ec) {
                    std::make_shared<session>(std::move(socket))->start();
                    do_accept();
                } else {
                    if (on_close_)
                        on_close_();
                }
            });
    }

    tcp::acceptor acceptor_;
    Callback      on_close_;
};

void session::start_listener(Port port)
{
    auto [it, inserted] = g_listeners.try_emplace(port);

    if (!inserted) {
        message("Already listening on port " + std::to_string(port));
    } else {
        auto on_close = [handle = weak_from_this(), port] {
            if (auto self = handle.lock())
                self->message("The listener for port " + std::to_string(port) + " has closed");
        };
        auto s = std::make_shared<server>( //
            socket_.get_executor(), port, on_close);
        it->second = s;
        s->start();
        message("Started listening on port " + std::to_string(port));
    }
}

void session::stop_listener(Port port)
{
    auto it = g_listeners.find(port);
    if (it != g_listeners.end()) {
        if (auto server = it->second.lock())
        {
            message("Stopping listener on port " + std::to_string(port));
            server->stop();
        } else {
            // when two connections simultaneously STOP the same listener?
            message("Listener on port " + std::to_string(port) + " already stopped");
        }
        g_listeners.erase(it);
    } else {
        message("No listener on port " + std::to_string(port));
    }
}

int main(int argc, char* argv[])
{
    try {
        if (argc != 2) {
            std::cerr << "Usage: async_tcp_echo_server <port>\n";
            return 1;
        }

        boost::asio::io_context io_context;

        {
            Port port = std::atoi(argv[1]);
            auto s = std::make_shared<server>(io_context.get_executor(), port);
            s->start();

            g_listeners.emplace(port, s);
        }

        io_context.run();
    } catch (std::exception& e) {
        std::cerr << "Exception: " << e.what() << "\n";
    }
}

Which demo's interactively:

enter image description here

Upvotes: 1

Related Questions