Reputation: 825
I was digging through the Asio documention for sockets but I couldn't find anything useful on how I can handle the following situation:
I assume to have a lot of servers in a peer to peer network (up to 1000). Servers will have to communicate regularly with each other so I do not want to open a new client connection to send a message to another server every time this is needed (huge overhead).
At the same time, creating n threads that each correspond to a client -> server connection is also not really viable.
I'll implement different communication schemes (all-to-all, star and tree) so 1, log(n) and n of the servers will have to instantiate those n socket clients to create a connection to the other servers.
Is there a good way I can simply do (pseudocode).
pool = ConnectionPool.create(vector<IP>);
pool.sendMessage(ip, message);
I know on the server side I can use an async connection. However, I don't really know how to handle it from the "client" (sender) perspective in C++/Asio.
Tl:DR;
Which APIs and classes am I supposed to use when I want to "send" messages to N servers without having to open N connections every time I do that and neither using N threads".
Upvotes: 3
Views: 2446
Reputation: 392921
Yes, each process will need a server side (to receive messages from any of the n participants) and one client side (to send messages to any of the n participants). However, as far as I could find in Asio, the only way to send messages to k of the n participants is by creating k threads with k connections
Then you must not have looked in the right place, or not very far at all.
A core tenet async IO is multiplexing IO on a single thread (all of the kqueue/epoll/select/IO completion ports etc abstractions are geared towards that goal).
Here's an absolutely lazy-coded demonstration that shows:
on a heartbeat interval we send all the peers a heartbeat message
for (auto& peer : peers)
async_write(peer, buffer(message), [ep=peer.remote_endpoint(ec)](error_code ec, size_t xfr) {
std::cout << "(sent " << xfr << " bytes to " << ep << "(" << ec.message() << ")" << std::endl;
});
#include <boost/asio.hpp>
#include <list>
#include <iostream>
using std::tuple;
using namespace std::literals;
template <typename T>
static auto reference_eq(T const& obj) {
return [p=&obj](auto& ref) { return &ref == p; };
}
int main() {
using namespace boost::asio; // don't be this lazy please
using boost::system::error_code;
using ip::tcp;
io_context ioc;
tcp::acceptor listener(ioc, {{}, 6868});
listener.set_option(tcp::acceptor::reuse_address(true));
listener.listen();
using Loop = std::function<void()>;
std::list<tcp::socket> clients, peers;
// accept unbounded clients
Loop accept_loop = [&] {
listener.async_accept([&](error_code const& ec, tcp::socket s) {
if (!ec) {
std::cout << "New session " << s.remote_endpoint() << std::endl;
clients.push_back(std::move(s));
accept_loop();
}
});
};
tcp::resolver resoler(ioc);
for (auto [host,service] : {
tuple{"www.example.com", "http"},
{"localhost", "6868"},
{"::1", "6868"},
// ...
})
{
auto& p = peers.emplace_back(ioc);
async_connect(p, resoler.resolve(host,service), [&,spec=(host+":"s+service)](error_code ec, auto...) {
std::cout << "For " << spec << " (" << ec.message() << ")";
if (!ec)
std::cout << " " << p.remote_endpoint();
else
peers.remove_if(reference_eq(p));
std::cout << std::endl;
});
}
std::string const& message = "heartbeat\n";
high_resolution_timer timer(ioc);
Loop heartbeat = [&]() mutable {
timer.expires_from_now(2s);
timer.async_wait([&](error_code ec) {
std::cout << "heartbeat " << ec.message() << std::endl;
if (ec)
return;
for (auto& peer : peers)
async_write(peer, buffer(message), [ep=peer.remote_endpoint(ec)](error_code ec, size_t xfr) {
std::cout << "(sent " << xfr << " bytes to " << ep << "(" << ec.message() << ")" << std::endl;
});
heartbeat();
});
};
signal_set sigs(ioc, SIGINT, SIGTERM);
sigs.async_wait([&](error_code ec, int sig) {
if (!ec) {
std::cout << "signal: " << strsignal(sig) << std::endl;
listener.cancel();
timer.cancel();
} });
accept_loop();
heartbeat();
ioc.run_for(10s); // max time for Coliru, or just `run()`
}
Prints (on my system):
New session 127.0.0.1:46730
For localhost:6868 (Success) 127.0.0.1:6868
For ::1:6868 (Connection refused)
For www.example.com:http (Success) 93.184.216.34:80
heartbeat Success
(sent 10 bytes to 93.184.216.34:80(Success)
(sent 10 bytes to 127.0.0.1:6868(Success)
heartbeat Success
(sent 10 bytes to 93.184.216.34:80(Success)
(sent 10 bytes to 127.0.0.1:6868(Success)
heartbeat Success
(sent 10 bytes to 93.184.216.34:80(Success)
(sent 10 bytes to 127.0.0.1:6868(Success)
^Csignal: Interrupt
heartbeat Operation canceled
Note how the one client ("New session") is our own peer connection on localhost:6868 :)
Of course, in real life you would have a class to represent a client session, perhaps have queues for messages pending sending, and optionally run on multiple threads (using strand
s to synchronize access to shared objects).
If you really wish to avoid an explicit collection of clients, see this very similar demo: How to pass a boost asio tcp socket to a thread for sending heartbeat to client or server which
¹ it's not working on coliru because of limited access to network. A loop-back only version without resolver use works: Live On Coliru
Upvotes: 4
Reputation: 5039
Since you stated you want to use a TCP i.e. connection based protocol, you can use the async ASIO API and could rely on 1 thread, because async i.e. reactor pattern call do not block.
Your server would use boost::asio::async_write
to a boost::asio::ip::tcp::socket
, which is equal to one TCP connection happening. The callback you give async_write
as a parameter will be called when you are done sending, but async_write
would return immediatly. Receiving would be similar to a client. In order to get a TCP connection to a incoming client you would have to use a boost::asio::ip::tcp::resolver
which opens new TCP connections/sockets for you by listening via boost::asio::ip::tcp::resolver::async_resolve
in the client and boost::asio::ip::tcp::acceptor
initialized with a boost::asio::ip::tcp::endpoint
and boost::asio::ip::tcp::acceptor::async_accept
on server side. Actually you would need 2, one for IPv4 and for IPv6 each.
Since you would have some state with a TCP connection on server side, you would ordinary have to track in a central place, but to avoid this contention and ease the pattern, its common to use a class which inherits std::enable_shared_from_this
, which will give a std::shared_pointer
of itself into the callback to std::async_write
so that, between sending and receiving, where the thread is not blocked in the usual sense, it would not be forgotten i.e. deleted.
For reading I recommend boost::asio::async_read_until
and in general a boost::asio::streambuf
.
By this 1 thread that runs boost::asio::io_context::run
in a loop would suffice, it would unblock every-time one of the many connections need processing of the received stuff or something new to be sent has to be generated.
The general project is a bit out of scope, it would help if you could narrow your question a bit, or better read the talks and examples. I have written something similiar as you indent, a resilient overlay network: https://github.com/Superlokkus/code
Upvotes: 1