Reputation: 933
I try to do a simple tcp/http server with boost asio and boost beast.
But, when I try to read the socket message, I have bad file descritor
.
I don't really understand what is wrong. I transfer the socket from the server class to detect_session class with std::move to get the same "socket"
Server
tcp_server::tcp_server(boost::asio::io_context& ioc, tcp::endpoint endpoint,
std::shared_ptr<std::string const> const& doc_root)
: acceptor(ioc, endpoint),
doc_root(doc_root)
{
wait_for_connection();
}
void tcp_server::wait_for_connection()
{
acceptor.async_accept(
[this](boost::system::error_code ec, tcp::socket socket)
{
if (!ec)
{
std::cout << "accepted" << std::endl;
std::make_shared<detect_session>(std::move(socket), std::move(buffer),
doc_root)->run();
}
wait_for_connection();
});
}
detect_session.h
#ifndef DETECT_SESSION_H
#define DETECT_SESSION_H
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/strand.hpp>
#include <boost/asio/bind_executor.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/make_unique.hpp>
#include "message.h"
#include "http_session.h"
#include "tcp_connexion.h"
#include "logger.h"
using tcp = boost::asio::ip::tcp;
namespace ssl = boost::asio::ssl;
namespace http = boost::beast::http;
namespace websocket = boost::beast::websocket ;
class detect_session: public std::enable_shared_from_this<detect_session>
{
boost::asio::strand<boost::asio::io_context::executor_type> strand;
tcp::socket m_socket;
std::shared_ptr<std::string const> doc_root;
public:
detect_session(tcp::socket socket, boost::beast::flat_buffer buffer,std::shared_ptr<std::string const> const& doc_root);
~detect_session();
void run();
void handshake();
void on_handshake();
void do_read();
void on_read(boost::system::error_code ec);
http::request<http::string_body> req;
protected:
boost::beast::flat_buffer buffer;
connection_ptr m_tcp_connection;
private:
void on_timer(boost::system::error_code ec);
boost::asio::steady_timer timer;
void do_timeout();
void checkGETVerb(boost::system::error_code ec);
void do_eof();
message message_read;
};
#endif // DETECT_SESSION_H
detect_session.cpp
detect_session::detect_session(tcp::socket socket,
boost::beast::flat_buffer buffer,std::shared_ptr<std::string const> const&
doc_root)
: m_socket(std::move(socket))
, strand(socket.get_executor())
, timer(socket.get_executor().context(),
(std::chrono::steady_clock::time_point::max)())
, doc_root(doc_root)
{
}
detect_session::~detect_session()
{
//dtor
}
void detect_session::run()
{
if(! strand.running_in_this_thread())
return
boost::asio::post(boost::asio::bind_executor(strand,std::bind(&detect_session::run, shared_from_this())));
on_timer({});
do_read();
}
void detect_session::on_timer(boost::system::error_code ec)
{
if(ec && ec != boost::asio::error::operation_aborted)
// return fail(ec, "timer");
// Verify that the timer really expired since the deadline may have moved.
if(timer.expiry() <= std::chrono::steady_clock::now())
return do_timeout();
// Wait on the timer
timer.async_wait(
boost::asio::bind_executor(
strand,
std::bind(
&detect_session::on_timer,
shared_from_this(),
std::placeholders::_1)));
}
void detect_session::do_read()
{
timer.expires_after(std::chrono::seconds(15));
//boost::asio::io_context &ioc = m_socket.get_executor().context();
m_tcp_connection = connection_ptr(new tcp_connection(m_socket.get_executor().context()));
m_tcp_connection->async_read(message_read, boost::bind(&detect_session::on_read, this, boost::asio::placeholders::error) );
}
void detect_session::on_read(boost::system::error_code ec)
{
// Happens when the timer closes the socket
if(ec == boost::asio::error::operation_aborted)
return;
// This means they closed the connection
if(ec == http::error::end_of_stream)
do_eof();
if(ec)
return log.fail(ec, "read");
std::cout<< <<message_read.m_message<< std::endl;
if(message_read.tcp == 0)
{
// See if it is a HTTP session
req = {};
http::async_read(m_socket, buffer, req, boost::asio::bind_executor(strand, std::bind(&detect_session::checkGETVerb, this, std::placeholders::_1) ));
}
// else
/*std::make_shared<connection_ptr>(
std::move(socket.get_executor().context()));*/
}
void detect_session::checkGETVerb(boost::system::error_code ec)
{
if (req.method() == http::verb::get)
{
std::cout<<req << std::endl;
}
}
void detect_session::do_timeout()
{
// Closing the socket cancels all outstanding operations. They
// will complete with boost::asio::error::operation_aborted
boost::system::error_code ec;
m_socket.shutdown(tcp::socket::shutdown_both, ec);
m_socket.close(ec);
}
void detect_session::do_eof()
{
// Send a TCP shutdown
boost::system::error_code ec;
m_socket.shutdown(tcp::socket::shutdown_send, ec);
std::cout<< "socket closed"<<std::endl;
// At this point the connection is closed gracefully
}
tcp_connection.h
#include <boost/tuple/tuple.hpp>
#include <boost/archive/text_iarchive.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/array.hpp>
#include <iostream>
class tcp_connection
{
public:
tcp_connection(boost::asio::io_context& io_context) : m_socket(io_context)
{
}
boost::asio::ip::tcp::socket& socket()
{
return m_socket;
}
template <typename T, typename Handler>
void async_write(const T& t, Handler handler)
{
// Serialize the data first so we know how large it is.
std::ostringstream archive_stream;
boost::archive::text_oarchive archive(archive_stream);
archive << t;
m_outbound_data = archive_stream.str();
// Format the header.
std::ostringstream header_stream;
header_stream << std::setw(header_length)
<< std::hex << m_outbound_data.size();
if (!header_stream || header_stream.str().size() != header_length)
{
// Something went wrong, inform the caller.
boost::system::error_code error(boost::asio::error::invalid_argument);
m_socket.get_io_service().post(boost::bind(handler, error));
return;
}
m_outbound_header = header_stream.str();
// Write the serialized data to the socket. We use "gather-write" to send
// both the header and the data in a single write operation.
std::vector<boost::asio::const_buffer> buffers;
buffers.push_back(boost::asio::buffer(m_outbound_header));
buffers.push_back(boost::asio::buffer(m_outbound_data));
boost::asio::async_write(m_socket, buffers, handler);
}
/// Asynchronously read a data structure from the socket.
template <typename T, typename Handler>
void async_read(T& t, Handler handler)
{
// Issue a read operation to read exactly the number of bytes in a header.
void (tcp_connection::*f)(
const boost::system::error_code&,
T&, boost::tuple<Handler>)
= &tcp_connection::handle_read_header<T, Handler>;
boost::asio::async_read(m_socket, boost::asio::buffer(m_inbound_header),
boost::bind(f,
this, boost::asio::placeholders::error, boost::ref(t),
boost::make_tuple(handler)));
}
/// Handle a completed read of a message header. The handler is passed using
/// a tuple since boost::bind seems to have trouble binding a function object
/// created using boost::bind as a parameter.
template <typename T, typename Handler>
void handle_read_header(const boost::system::error_code& e,
T& t, boost::tuple<Handler> handler)
{
if (e)
{
boost::get<0>(handler)(e);
}
else
{
// Determine the length of the serialized data.
std::istringstream is(std::string(m_inbound_header, header_length));
std::size_t m_inbound_datasize = 0;
if (!(is >> std::hex >> m_inbound_datasize))
{
// Header doesn't seem to be valid. Inform the caller.
boost::system::error_code error(boost::asio::error::invalid_argument);
boost::get<0>(handler)(error);
return;
}
// Start an asynchronous call to receive the data.
m_inbound_data.resize(m_inbound_datasize);
void (tcp_connection::*f)(
const boost::system::error_code&,
T&, boost::tuple<Handler>)
= &tcp_connection::handle_read_data<T, Handler>;
boost::asio::async_read(m_socket, boost::asio::buffer(m_inbound_data),
boost::bind(f, this,
boost::asio::placeholders::error, boost::ref(t), handler));
}
}
/// Handle a completed read of message data.
template <typename T, typename Handler>
void handle_read_data(const boost::system::error_code& e,
T& t, boost::tuple<Handler> handler)
{
if (e)
{
boost::get<0>(handler)(e);
}
else
{
// Extract the data structure from the data just received.
try
{
std::string archive_data(&m_inbound_data[0], m_inbound_data.size());
std::istringstream archive_stream(archive_data);
boost::archive::text_iarchive archive(archive_stream);
archive >> t;
}
catch (std::exception& e)
{
// Unable to decode data.
boost::system::error_code error(boost::asio::error::invalid_argument);
boost::get<0>(handler)(error);
return;
}
// Inform caller that data has been received ok.
boost::get<0>(handler)(e);
}
}
private:
/// The underlying socket.
boost::asio::ip::tcp::socket m_socket;
/// The size of a fixed length header.
enum { header_length = 8 };
/// Holds an outbound header.
std::string m_outbound_header;
/// Holds the outbound data.
std::string m_outbound_data;
/// Holds an inbound header.
char m_inbound_header[header_length];
/// Holds the inbound data.
std::vector<char> m_inbound_data;
boost::array<char, 128> m_network_buffer;
};
typedef boost::shared_ptr<tcp_connection> connection_ptr;
#endif // TCP_CONNECTION_H
message.h
#ifndef MESSAGE_H
#define MESSAGE_H
#include <boost/archive/text_iarchive.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/serialization/map.hpp>
#include <boost/serialization/list.hpp>
class message
{
public:
void reset()
{
m_list_string.clear();
m_message.clear();
m_login.clear();
}
int m_type;
// Generic datas
std::list<std::string> m_list_string;
std::string m_message;
std::string m_login;
bool tcp;
template<class Archive>
void serialize(Archive& ar, const unsigned int version){
ar & m_type & m_list_string & m_message & m_login;
}
enum {
NEW_MSG = 0,
PERSON_LEAVED = 1,
PERSON_CONNECTED = 2,
};
};
#endif // MESSAGE_H
Client side :
tcp_client::tcp_client(boost::asio::io_context& io_context): m_io_context(io_context), socket(io_context)
{
}
tcp_client::~tcp_client()
{
}
void tcp_client::run()
{
boost::asio::io_context io_context;
tcp::resolver resolver(io_context);
const tcp::resolver::results_type endpoint = resolver.resolve("192.168.9.129", "4000");
m_tcp_connection = connection_ptr(new tcp_connection(m_io_context));
tcp::socket& sock = m_tcp_connection->socket();
boost::asio::async_connect(sock, endpoint,
[this](boost::system::error_code ec, tcp::endpoint)
{
if (!ec)
{
write(QString("Welcome !"));
}
});
}
// Close the connection
void tcp_client::close()
{
m_io_context.post(boost::bind(&tcp_client::do_close, this));
}
void tcp_client::handle_read(const boost::system::error_code& error)
{
std::cout<<"12"<<std::endl;
if (!error)
{
notify(m_message_read);
m_tcp_connection->async_read(m_message_read,
boost::bind(&tcp_client::handle_read, this,
boost::asio::placeholders::error)
);
}
else
{
do_close();
}
}
void tcp_client::write(QString msg)
{
message e;
e.m_type = message::NEW_MSG;
e.m_login = m_login;
e.m_message = msg.toStdString();
e.tcp = 1;
write(e);
}
void tcp_client::write(message& e)
{
m_tcp_connection->async_write(e,
boost::bind(&tcp_client::handle_write, this,
boost::asio::placeholders::error)
);
}
tcp_connection.h and message.h are the same into client and server side
Upvotes: 1
Views: 2849
Reputation: 392979
This is by design. If you close
a socket, the handle (descriptor) stops being valid, so any operation started after that would complain about the handle not being valid.
(worse, it could be reused for a new file/connection and you might end up talking to the wrong party, causing undefined behaviour or data corruption. I've seen a bug like that live. Not fun to debug)
You probably want to /only/ do shutdown
, which will cause all pending/new operations to fail, but the handle is still valid. Then when the socket
instance is destructed, it will do the close
call automatically, and safely, without the race condition.
Upvotes: 0