Reputation: 610
I 'm using a strand to avoid concurrent writes on TCP server using Boost.Asio
. But it seems it only prevents concurrent execution of handlers.
Indeed if I do two successive async_write
, one with a very big packet, and the other with a very small one, wireshark shows interleaves. As async_write
is composed of multiple calls of async_write_some
, it seems that the handler of my second write is allowed to be executed between two handlers of the first call. Which is very bad for me.
Wireshark output: [Packet 1.1] [Packet 1.2] [Packet 2] [Packet 1.3] ... [Packet 1.x]
struct Command
{
// Header
uint64_t ticket_id; // UUID
uint32_t data_size; // size of data
// data
std::vector<unsigned char> m_internal_buffer;
}
typedef std::shared_ptr<Command> command_type;
void tcp_server::write(command_type cmd)
{
boost::asio::async_write(m_socket, boost::asio::buffer(cmd->getData(), cmd->getTotalPacketSize()),
boost::asio::bind_executor(m_write_strand,
[this, cmd](const boost::system::error_code& error, std::size_t bytes_transferred)
{
if (error)
{
// report
}
}
)
);
}
and the main:
int main()
{
tcp_server.write(big_packet); // Packet 1 = 10 MBytes !
tcp_server.write(small_packet); // Packet 2 = 64 kbytes
}
Is the strand not appropriate in my case ?
P.S: I saw that close topic here but it does not cover the same use case in my opinion.
Upvotes: 1
Views: 1548
Reputation: 392979
You have to make sure your async operation is initiated from the strand. Your code currently doesn't show this to be the case. Hopefully this helps, otherwise, post a MCVE
So e.g.
void tcp_server::write(command_type cmd)
{
post(m_write_strand, [this, cmd] { this->do_write(cmd); });
}
Making up a MCVE from your question code:
#include <boost/asio.hpp>
using boost::asio::ip::tcp;
using Executor = boost::asio::thread_pool::executor_type;
struct command {
char const* getData() const { return ""; }
size_t getTotalPacketSize() const { return 1; }
};
using command_type = command*;
struct tcp_server {
tcp_server(Executor ex) : m_socket(ex), m_write_strand(ex)
{
// more?
}
void write(command_type cmd);
void do_write(command_type cmd);
tcp::socket m_socket;
boost::asio::strand<Executor> m_write_strand;
};
void tcp_server::write(command_type cmd)
{
post(m_write_strand, [this, cmd] { this->do_write(cmd); });
}
void tcp_server::do_write(command_type cmd)
{
boost::asio::async_write(
m_socket,
boost::asio::buffer(cmd->getData(), cmd->getTotalPacketSize()),
bind_executor(m_write_strand,
[/*this, cmd*/](boost::system::error_code error,
size_t bytes_transferred) {
if (error) {
// report
}
}));
}
int main() {
boost::asio::thread_pool ioc;
tcp_server tcp_server(ioc.get_executor());
command_type big_packet{}, small_packet{};
tcp_server.write(big_packet); // Packet 1 = 10 MBytes !
tcp_server.write(small_packet); // Packet 2 = 64 kbytes
ioc.join();
}
Upvotes: 3