Reputation: 51
I'm using asio (non-boost) to create a TCP server and while my code works it's not done properly because I'm calling asio::async_write
from multiple threads. I think I should use strands but the more I read about that the more lost I am.
#include <cstdlib>
#include <iostream>
#include <utility>
#include <thread>
#include <asio/ts/buffer.hpp>
#include <asio/ts/internet.hpp>
#include "messages.h"
using asio::ip::tcp;
class session
: public std::enable_shared_from_this<session>
{
public:
session(tcp::socket socket)
: socket_(std::move(socket))
{
}
void start()
{
handler = MessageHandler();
asio::write(socket_, asio::buffer(handler.initialMessage()));
do_read();
}
private:
void do_read()
{
auto self(shared_from_this());
socket_.async_read_some(asio::buffer(data_, max_length),
[this, self](std::error_code ec, std::size_t length)
{
if (!ec)
{
buffer_.append(data_, length);
size_t pos;
while ((pos = buffer_.find('\0')) != std::string::npos)
{
std::string message = buffer_.substr(0, pos);
buffer_.erase(0, pos + 1);
std::thread(&session::process_message, this, message).detach();
}
do_read();
}
else if (ec != asio::error::eof)
{
std::cerr << "Read error: " << ec.message() << '\n';
}
});
}
void do_write(std::string message)
{
auto self(shared_from_this());
asio::async_write(socket_, asio::buffer(message),
[this, self](std::error_code ec, std::size_t /*length*/)
{
if (!ec)
{
}
else if (ec != asio::error::eof)
{
std::cerr << "Write error: " << ec.message() << '\n';
}
});
}
void process_message(std::string message) {
std::string response = handler.processMessage(message);
do_write(response);
}
tcp::socket socket_;
enum { max_length = 1024 };
char data_[max_length];
std::string buffer_;
MessageHandler handler;
};
class server
{
public:
server(asio::io_context& io_context, unsigned short port)
: acceptor_(io_context, tcp::endpoint(tcp::v4(), port)),
socket_(io_context)
{
do_accept();
}
private:
void do_accept()
{
acceptor_.async_accept(socket_,
[this](std::error_code ec)
{
if (!ec)
{
std::make_shared<session>(std::move(socket_))->start();
}
do_accept();
});
}
tcp::acceptor acceptor_;
tcp::socket socket_;
};
void serverInit()
{
try
{
asio::io_context io_context;
server s(io_context, 0);
io_context.run();
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << '\n';
}
}
Upvotes: 2
Views: 1553
Reputation: 393084
You only have 1 thread running the IO service. Everything is on an implicit strand (Why do I need strand per connection when using boost::asio?), no need to worry UNTIL you start using a new thread.
The simplest fix, then, would seem to make sure sending the replies happens on the IO service as well:
void process_message(std::string const& message) {
std::string response = handler.processMessage(message);
post(socket_.get_executor(),
std::bind(&session::do_write, shared_from_this(), response));
}
Now if you wanted to be able to run the IO services on multiple threads, you would just make sure that the socket uses a strand executor.
This doesn't guarantee that you won't see overlapping async_write operations, because the speed at which incoming messages are processed might be higher than the speed in which they are sent. Therefore the customary solution is
In my examples I typically call this FIFO queue "outbox_" and I prefer to use deque
for reasons of iterator/reference stability (see Iterator invalidation rules for C++ containers):
void do_write(std::string message)
{
outbox_.push_back(std::move(message)); // assumed on (implicit) strand
if (outbox_.size() == 1) {
write_loop();
}
}
void write_loop() {
if (outbox_.empty())
return;
asio::async_write( //
socket_, asio::buffer(outbox_.front()),
[this, self = shared_from_this()] //
(error_code ec, std::size_t /*length*/) {
if (!ec) {
outbox_.pop_front();
write_loop();
} else if (ec != asio::error::eof) {
std::cerr << "Write error: " << ec.message() << '\n';
}
});
}
Here's a fixed listing with a stub message.h.
It also greatly simplifies the reading/buffer handling by using the existing async_read_until
composed operation, which does everything you had manually written.
#include <boost/asio.hpp>
#include <cstdlib>
#include <deque>
#include <iostream>
#include <thread>
#include <utility>
#if 0
#include "messages.h"
#else // mock messages.h
#include <boost/lexical_cast.hpp>
#include <iomanip>
struct MessageHandler {
std::string initialMessage() const { return "Initial\n"; }
std::string processMessage(std::string const& req) const {
return "Processed " +
boost::lexical_cast<std::string>(std::quoted(req)) + "\n";
}
};
#endif
namespace asio = boost::asio;
using boost::system::error_code;
using asio::ip::tcp;
class session : public std::enable_shared_from_this<session> {
public:
session(tcp::socket socket) : socket_(std::move(socket)) {}
void start() {
handler = MessageHandler();
asio::write(socket_, asio::buffer(handler.initialMessage()));
do_read();
}
private:
void do_read() {
async_read_until(
socket_, asio::dynamic_buffer(buffer_), '\0',
[this, self = shared_from_this()] //
(error_code ec, std::size_t length) {
if (!ec) {
std::thread(&session::process_message, this, buffer_.substr(0, length - 1)).detach();
buffer_.erase(0, length);
do_read();
} else if (ec != asio::error::eof) {
std::cerr << "Read error: " << ec.message() << std::endl;
}
});
}
void do_write(std::string message)
{
outbox_.push_back(std::move(message)); // assumed on (implicit) strand
if (outbox_.size() == 1) {
write_loop();
}
}
void write_loop() {
if (outbox_.empty())
return;
asio::async_write( //
socket_, asio::buffer(outbox_.front()),
[this, self = shared_from_this()] //
(error_code ec, std::size_t /*length*/) {
if (!ec) {
outbox_.pop_front();
write_loop();
} else if (ec != asio::error::eof) {
std::cerr << "Write error: " << ec.message() << std::endl;
}
});
}
void process_message(std::string const& message) {
std::string response = handler.processMessage(message);
// dispatch/post to executor because we are on a different thread
post(socket_.get_executor(),
std::bind(&session::do_write, shared_from_this(), response));
}
tcp::socket socket_;
std::string buffer_;
std::deque<std::string> outbox_;
MessageHandler handler;
};
class server
{
public:
server(asio::io_context& io_context, unsigned short port)
: acceptor_(io_context, tcp::endpoint(tcp::v4(), port)),
socket_(io_context)
{
do_accept();
}
private:
void do_accept()
{
acceptor_.async_accept(socket_, [this](error_code ec) {
if (!ec) {
std::cout << "Accepted " << socket_.remote_endpoint() << std::endl;
std::make_shared<session>(std::move(socket_))->start();
}
do_accept();
});
}
tcp::acceptor acceptor_;
tcp::socket socket_;
};
void serverInit() {
try {
asio::io_context io_context;
server s(io_context, 8989);
io_context.run();
} catch (std::exception const& e) {
std::cerr << "Exception: " << e.what() << std::endl;
}
}
int main() { serverInit(); }
When sending a last burst of requests:
printf 'Message%d\0' {1..100} | nc 127.0.0.1 8989 -w1
Prints correctly e.g.:
Accepted 127.0.0.1:34862
And the client receivese e.g.:
Initial
Processed "Message2"
Processed "Message1"
Processed "Message4"
Processed "Message3"
Processed "Message5"
Processed "Message6"
Processed "Message7"
Processed "Message8"
Processed "Message9"
Processed "Message10"
Processed "Message11"
Processed "Message12"
Processed "Message13"
Processed "Message15"
Processed "Message16"
Processed "Message14"
Processed "Message18"
Processed "Message19"
Processed "Message20"
Processed "Message21"
Processed "Message22"
Processed "Message23"
Processed "Message24"
Processed "Message25"
Processed "Message26"
Processed "Message27"
Processed "Message28"
Processed "Message29"
Processed "Message30"
Processed "Message31"
Processed "Message32"
Processed "Message33"
Processed "Message34"
Processed "Message35"
Processed "Message17"
Processed "Message36"
Processed "Message38"
Processed "Message39"
Processed "Message40"
Processed "Message41"
Processed "Message42"
Processed "Message43"
Processed "Message44"
Processed "Message45"
Processed "Message46"
Processed "Message47"
Processed "Message48"
Processed "Message49"
Processed "Message50"
Processed "Message51"
Processed "Message52"
Processed "Message53"
Processed "Message54"
Processed "Message55"
Processed "Message56"
Processed "Message57"
Processed "Message58"
Processed "Message59"
Processed "Message60"
Processed "Message61"
Processed "Message62"
Processed "Message63"
Processed "Message64"
Processed "Message65"
Processed "Message66"
Processed "Message67"
Processed "Message68"
Processed "Message69"
Processed "Message70"
Processed "Message71"
Processed "Message72"
Processed "Message73"
Processed "Message74"
Processed "Message75"
Processed "Message76"
Processed "Message77"
Processed "Message78"
Processed "Message79"
Processed "Message80"
Processed "Message81"
Processed "Message82"
Processed "Message83"
Processed "Message84"
Processed "Message85"
Processed "Message86"
Processed "Message87"
Processed "Message88"
Processed "Message89"
Processed "Message90"
Processed "Message91"
Processed "Message92"
Processed "Message93"
Processed "Message94"
Processed "Message95"
Processed "Message96"
Processed "Message97"
Processed "Message98"
Processed "Message99"
Processed "Message100"
Processed "Message37"
Minimal changes:
class server
{
public:
server(asio::any_io_executor ex, unsigned short port)
: acceptor_(ex, tcp::endpoint(tcp::v4(), port)) {
do_accept();
}
private:
void do_accept()
{
acceptor_.async_accept(make_strand(acceptor_.get_executor()), [this](error_code ec, tcp::socket&& s) {
if (!ec) {
std::cout << "Accepted " << s.remote_endpoint() << std::endl;
std::make_shared<session>(std::move(s))->start();
}
do_accept();
});
}
tcp::acceptor acceptor_;
};
void serverInit() {
try {
asio::thread_pool io_context;
server s(io_context.get_executor(), 8989);
io_context.join();
} catch (std::exception const& e) {
std::cerr << "Exception: " << e.what() << std::endl;
}
}
Live demo:
Upvotes: 3