poukill
poukill

Reputation: 610

Boost.Asio async_write and strands

I 'm using a strand to avoid concurrent writes on TCP server using Boost.Asio. But it seems it only prevents concurrent execution of handlers.

Indeed if I do two successive async_write, one with a very big packet, and the other with a very small one, wireshark shows interleaves. As async_write is composed of multiple calls of async_write_some, it seems that the handler of my second write is allowed to be executed between two handlers of the first call. Which is very bad for me.

Wireshark output: [Packet 1.1] [Packet 1.2] [Packet 2] [Packet 1.3] ... [Packet 1.x]

struct Command
{
    // Header
    uint64_t ticket_id; // UUID
    uint32_t data_size; // size of data

    // data
    std::vector<unsigned char> m_internal_buffer;
}
typedef std::shared_ptr<Command> command_type;
void tcp_server::write(command_type cmd)
{
    boost::asio::async_write(m_socket, boost::asio::buffer(cmd->getData(), cmd->getTotalPacketSize()),
        boost::asio::bind_executor(m_write_strand,
            [this, cmd](const boost::system::error_code& error, std::size_t bytes_transferred)
            {
                if (error)
                {
                   // report
                }
            }
        )
    );
}

and the main:

int main()
{
    tcp_server.write(big_packet); // Packet 1 = 10 MBytes !
    tcp_server.write(small_packet); // Packet 2 = 64 kbytes
}

Is the strand not appropriate in my case ?

P.S: I saw that close topic here but it does not cover the same use case in my opinion.

Upvotes: 1

Views: 1548

Answers (1)

sehe
sehe

Reputation: 392979

You have to make sure your async operation is initiated from the strand. Your code currently doesn't show this to be the case. Hopefully this helps, otherwise, post a MCVE

So e.g.

void tcp_server::write(command_type cmd)
{
    post(m_write_strand, [this, cmd] { this->do_write(cmd); });
}

Making up a MCVE from your question code:

Live On Coliru

#include <boost/asio.hpp>
using boost::asio::ip::tcp;
using Executor = boost::asio::thread_pool::executor_type;

struct command {
    char const* getData()            const { return ""; }
    size_t      getTotalPacketSize() const { return 1;  }
};
using command_type = command*;

struct tcp_server {
    tcp_server(Executor ex) : m_socket(ex), m_write_strand(ex)
    {
        // more?
    }
    void write(command_type cmd);
    void do_write(command_type cmd);

    tcp::socket m_socket;
    boost::asio::strand<Executor> m_write_strand;
};

void tcp_server::write(command_type cmd)
{
    post(m_write_strand, [this, cmd] { this->do_write(cmd); });
}

void tcp_server::do_write(command_type cmd)
{
    boost::asio::async_write(
        m_socket,
        boost::asio::buffer(cmd->getData(), cmd->getTotalPacketSize()),
        bind_executor(m_write_strand,
                      [/*this, cmd*/](boost::system::error_code error,
                                      size_t bytes_transferred) {
                          if (error) {
                              // report
                          }
                      }));
}

int main() {
    boost::asio::thread_pool ioc;
    tcp_server tcp_server(ioc.get_executor());

    command_type big_packet{}, small_packet{};
    tcp_server.write(big_packet);   // Packet 1 = 10 MBytes !
    tcp_server.write(small_packet); // Packet 2 = 64 kbytes

    ioc.join();
}

Upvotes: 3

Related Questions