raycons
raycons

Reputation: 825

Asio Peer to Peer Network programming

I was digging through the Asio documention for sockets but I couldn't find anything useful on how I can handle the following situation:

I assume to have a lot of servers in a peer to peer network (up to 1000). Servers will have to communicate regularly with each other so I do not want to open a new client connection to send a message to another server every time this is needed (huge overhead).

At the same time, creating n threads that each correspond to a client -> server connection is also not really viable.

I'll implement different communication schemes (all-to-all, star and tree) so 1, log(n) and n of the servers will have to instantiate those n socket clients to create a connection to the other servers.

Is there a good way I can simply do (pseudocode).

pool = ConnectionPool.create(vector<IP>);
pool.sendMessage(ip, message);

I know on the server side I can use an async connection. However, I don't really know how to handle it from the "client" (sender) perspective in C++/Asio.

Tl:DR;

Which APIs and classes am I supposed to use when I want to "send" messages to N servers without having to open N connections every time I do that and neither using N threads".

Upvotes: 3

Views: 2446

Answers (2)

sehe
sehe

Reputation: 392921

Yes, each process will need a server side (to receive messages from any of the n participants) and one client side (to send messages to any of the n participants). However, as far as I could find in Asio, the only way to send messages to k of the n participants is by creating k threads with k connections

Then you must not have looked in the right place, or not very far at all.

A core tenet async IO is multiplexing IO on a single thread (all of the kqueue/epoll/select/IO completion ports etc abstractions are geared towards that goal).

Here's an absolutely lazy-coded demonstration that shows:

  • single threaded everything
  • a listener that accepts unbounded clients (we could easily add additional listeners)
  • we connect to a collection of "peers"
  • on a heartbeat interval we send all the peers a heartbeat message

        for (auto& peer : peers)
            async_write(peer, buffer(message), [ep=peer.remote_endpoint(ec)](error_code ec, size_t xfr) {
                std::cout << "(sent " << xfr << " bytes to " << ep << "(" << ec.message() << ")" << std::endl;
            });
    
  • additionally it handles asynchronous process signals (INT, TERM) to shutdown all the async operations

"Live¹" On Coliru

#include <boost/asio.hpp>
#include <list>
#include <iostream>
using std::tuple;
using namespace std::literals;

template <typename T>
static auto reference_eq(T const& obj) {
    return [p=&obj](auto& ref) { return &ref == p; };
}

int main() {
    using namespace boost::asio; // don't be this lazy please
    using boost::system::error_code;
    using ip::tcp;

    io_context ioc;
    tcp::acceptor listener(ioc, {{}, 6868});
    listener.set_option(tcp::acceptor::reuse_address(true));
    listener.listen();

    using Loop = std::function<void()>;

    std::list<tcp::socket> clients, peers;

    // accept unbounded clients
    Loop accept_loop = [&] {
        listener.async_accept([&](error_code const& ec, tcp::socket s) {
            if (!ec) {
                std::cout << "New session " << s.remote_endpoint() << std::endl;
                clients.push_back(std::move(s));
                accept_loop();
            }
        });
    };

    tcp::resolver resoler(ioc);
    for (auto [host,service] : {
                tuple{"www.example.com", "http"}, 
                {"localhost", "6868"}, 
                {"::1", "6868"}, 
                // ...
            })
    {
        auto& p = peers.emplace_back(ioc);
        async_connect(p, resoler.resolve(host,service), [&,spec=(host+":"s+service)](error_code ec, auto...) {
            std::cout << "For " << spec << " (" << ec.message() << ")";
            if (!ec)
                std::cout << " " << p.remote_endpoint();
            else
                peers.remove_if(reference_eq(p));
            std::cout << std::endl;
        });
    }

    std::string const& message = "heartbeat\n";
    high_resolution_timer timer(ioc);
    Loop heartbeat = [&]() mutable {
        timer.expires_from_now(2s);
        timer.async_wait([&](error_code ec) {
            std::cout << "heartbeat " << ec.message() << std::endl;
            if (ec)
                return;
            for (auto& peer : peers)
                async_write(peer, buffer(message), [ep=peer.remote_endpoint(ec)](error_code ec, size_t xfr) {
                    std::cout << "(sent " << xfr << " bytes to " << ep << "(" << ec.message() << ")" << std::endl;
                });
            heartbeat();
        });
    };

    signal_set sigs(ioc, SIGINT, SIGTERM);
    sigs.async_wait([&](error_code ec, int sig) {
        if (!ec) {
            std::cout << "signal: " << strsignal(sig) << std::endl;
            listener.cancel();
            timer.cancel();
        } });

    accept_loop();
    heartbeat();

    ioc.run_for(10s); // max time for Coliru, or just `run()`
}

Prints (on my system):

New session 127.0.0.1:46730
For localhost:6868 (Success) 127.0.0.1:6868
For ::1:6868 (Connection refused)
For www.example.com:http (Success) 93.184.216.34:80
heartbeat Success
(sent 10 bytes to 93.184.216.34:80(Success)
(sent 10 bytes to 127.0.0.1:6868(Success)
heartbeat Success
(sent 10 bytes to 93.184.216.34:80(Success)
(sent 10 bytes to 127.0.0.1:6868(Success)
heartbeat Success
(sent 10 bytes to 93.184.216.34:80(Success)
(sent 10 bytes to 127.0.0.1:6868(Success)
^Csignal: Interrupt
heartbeat Operation canceled

Note how the one client ("New session") is our own peer connection on localhost:6868 :)

Of course, in real life you would have a class to represent a client session, perhaps have queues for messages pending sending, and optionally run on multiple threads (using strands to synchronize access to shared objects).

OTHER SAMPLES

If you really wish to avoid an explicit collection of clients, see this very similar demo: How to pass a boost asio tcp socket to a thread for sending heartbeat to client or server which

  • also starts from single-threaded, but adds a thread pool for strand demonstration purposes)
  • It has a heartbeat timer per session meaning that each session can have their own frequency

¹ it's not working on coliru because of limited access to network. A loop-back only version without resolver use works: Live On Coliru

Upvotes: 4

Superlokkus
Superlokkus

Reputation: 5039

Since you stated you want to use a TCP i.e. connection based protocol, you can use the async ASIO API and could rely on 1 thread, because async i.e. reactor pattern call do not block.

Your server would use boost::asio::async_write to a boost::asio::ip::tcp::socket, which is equal to one TCP connection happening. The callback you give async_write as a parameter will be called when you are done sending, but async_write would return immediatly. Receiving would be similar to a client. In order to get a TCP connection to a incoming client you would have to use a boost::asio::ip::tcp::resolver which opens new TCP connections/sockets for you by listening via boost::asio::ip::tcp::resolver::async_resolve in the client and boost::asio::ip::tcp::acceptor initialized with a boost::asio::ip::tcp::endpoint and boost::asio::ip::tcp::acceptor::async_accept on server side. Actually you would need 2, one for IPv4 and for IPv6 each.

Since you would have some state with a TCP connection on server side, you would ordinary have to track in a central place, but to avoid this contention and ease the pattern, its common to use a class which inherits std::enable_shared_from_this, which will give a std::shared_pointer of itself into the callback to std::async_write so that, between sending and receiving, where the thread is not blocked in the usual sense, it would not be forgotten i.e. deleted.

For reading I recommend boost::asio::async_read_until and in general a boost::asio::streambuf.

By this 1 thread that runs boost::asio::io_context::run in a loop would suffice, it would unblock every-time one of the many connections need processing of the received stuff or something new to be sent has to be generated.

The general project is a bit out of scope, it would help if you could narrow your question a bit, or better read the talks and examples. I have written something similiar as you indent, a resilient overlay network: https://github.com/Superlokkus/code

Upvotes: 1

Related Questions