user9414424
user9414424

Reputation: 509

Sequential communication failed with boost::asio::streambuf in binary way?

Using boost::asio, I'm coding network stuff. I tried to build a simple send-and-receive-string protocol. The sender first send the string size to the receiver. Then the sender sends the actual string to the receiver.

In particular, I designed the following two protocols.

I built the above protocols as shown below: If I execute this protocol once, that works fine. However, if i execute this protocol more than once (e.g. two times), the string size that the receiver receives gets wrong.

First time : 1365 bytes.

Second time : 779073 bytes. (just read not 779073 but 7790)

I found that os << data_size is not done in a binary way. "779073" is just sent as 6 bytes string. But the receiver just reads 4bytes of it. How to send a binary data and to receive a binary data using boost::asio and boost::asio::streambuf?

Receiver

// socket is already defined
// ** first step: recv data size
boost::asio::streambuf buf;
boost::asio::read(
   socket,
   buf, 
   boost::asio::transfer_exactly(sizeof(uint32_t))
);
std::istream iss(&buf);
uint32_t read_len;
iss >>  read_len;

// ** second step: recv payload based on the data size
boost::asio::streambuf buf2;
read_len = boost::asio::read(socket, buf2, 
boost::asio::transfer_exactly(read_len), error);
cout << "  read "<< read_len << " bytes payload" << endl; 
std::istream is_payload(&buf2);
std::string str;
is_payload >> str;
cout << str << endl; 

Sender

// socket is already defined
string str=...;   // some string to be sent
// ** first step: tell the string size to the reciever
uint32_t data_size = str.size();
boost::asio::streambuf send_buf;
std::ostream os(&send_buf);
os << data_size;
size_t sent_byte = boost::asio::write(socket, send_buf.data());
cout << sent_byte << endl; // debug purpose

// ** second step: send the actual string (payload)
sent_byte = boost::asio::write(socket, boost::asio::buffer(reinterpret_cast<const char*>(&str[0]), data_size));
cout << sent_byte << endl; // debug purpose

Upvotes: 1

Views: 254

Answers (1)

sehe
sehe

Reputation: 393769

You can send the size binary, but that requires you to take architectural differences between devices and operating systems into account¹.

Here's my take on actually coding the protocol reusably:

//#define BOOST_ASIO_ENABLE_HANDLER_TRACKING
#include <boost/asio.hpp>
#include <boost/endian/arithmetic.hpp>

namespace ba = boost::asio;
using ba::ip::tcp;
using error_code = boost::system::error_code;

namespace Protocol { // your library

    using net_size_t = boost::endian::big_int32_t; // This protocol uses Big-endian network byte order

    template <typename Derived, typename Token, typename Sig = void(error_code, size_t)> 
    struct base_async_op : std::enable_shared_from_this<Derived> {
        using base_type = base_async_op<Derived, Token, Sig>;

        template <typename DeducedToken>
        base_async_op(DeducedToken &&token) : _token(std::forward<DeducedToken>(token)) {}

        using _Token   = std::decay_t<Token>;
        using _Init    = ba::async_completion<_Token, Sig>;
        using _Handler = typename _Init::completion_handler_type;

        _Token _token;
        _Init _init {_token};

        auto get_allocator() const noexcept { 
            return (boost::asio::get_associated_allocator)(_init.completion_handler);
        }
        using executor_type = ba::associated_executor_t<_Handler>;
        executor_type get_executor() const noexcept {
            return (boost::asio::get_associated_executor)(_init.completion_handler);
        }

        Derived& derived()             { return static_cast<Derived&>(*this);       } 
        Derived const& derived() const { return static_cast<Derived const&>(*this); } 

        template <typename F>
        auto wrap(F&& f) const {
            //std::cout << "WRAP: " << typeid(derived().get_executor()).name() << "\n";
            return ba::bind_executor(derived().get_executor(), std::forward<F>(f));
        }
    };

    template <typename Derived, typename Stream, typename Token, typename Sig = void(error_code, size_t)> 
    struct stream_async_op : base_async_op<Derived, Token, Sig> {
        using base_type = stream_async_op<Derived, Stream, Token, Sig>;

        template <typename DeducedToken>
        stream_async_op(Stream& s, DeducedToken &&token) : base_async_op<Derived, Token, Sig>(std::forward<DeducedToken>(token)), _stream(s)  {}

        Stream& _stream;

        using executor_type = ba::associated_executor_t<typename stream_async_op::_Handler, decltype(std::declval<Stream>().get_executor())>;
        executor_type get_executor() const noexcept {
            return (boost::asio::get_associated_executor)(this->_init.completion_handler, _stream.get_executor());
        }
    };

    template <typename AsyncStream, typename Buffer, typename Token>
    auto async_transmit(AsyncStream& s, Buffer message_buffer, Token&& token) {

        struct op : stream_async_op<op, AsyncStream, Token> {
            using op::base_type::base_type;
            using op::base_type::_init;
            using op::base_type::_stream;

            net_size_t _length[1];

            auto run(Buffer buffer) {
                auto self = this->shared_from_this();
                _length[0] = ba::buffer_size(buffer);

                ba::async_write(_stream, std::vector<ba::const_buffer> { ba::buffer(_length), buffer },
                    this->wrap([self,this](error_code ec, size_t transferred) { _init.completion_handler(ec, transferred); }));

                return _init.result.get();
            }
        };

        return std::make_shared<op>(s, std::forward<Token>(token))->run(message_buffer);
    }

    template <typename AsyncStream, typename Buffer, typename Token>
    auto async_receive(AsyncStream& s, Buffer& output, Token&& token) {

        struct op : stream_async_op<op, AsyncStream, Token> {
            using op::base_type::base_type;
            using op::base_type::_init;
            using op::base_type::_stream;

            net_size_t _length[1] = {0};

            auto run(Buffer& output) {
                auto self = this->shared_from_this();

                ba::async_read(_stream, ba::buffer(_length), this->wrap([self, this, &output](error_code ec, size_t transferred) {
                    if (ec)
                        _init.completion_handler(ec, transferred);
                    else
                        ba::async_read(_stream, ba::dynamic_buffer(output), ba::transfer_exactly(_length[0]),
                            this->wrap([self, this](error_code ec, size_t transferred) { 
                                _init.completion_handler(ec, transferred);
                            }));
                }));

                return _init.result.get();
            }
        };

        return std::make_shared<op>(s, std::forward<Token>(token))->run(output);
    }

    template <typename Output = std::string, typename AsyncStream, typename Token>
    auto async_receive(AsyncStream& s, Token&& token) {

        struct op : stream_async_op<op, AsyncStream, Token, void(error_code, Output)> {
            using op::base_type::base_type;
            using op::base_type::_init;
            using op::base_type::_stream;

            Output _output;
            net_size_t _length[1] = {0};

            auto run() {
                auto self = this->shared_from_this();

                ba::async_read(_stream, ba::buffer(_length), [self,this](error_code ec, size_t) {
                        if (ec)
                            _init.completion_handler(ec, std::move(_output));
                        else
                            ba::async_read(_stream, ba::dynamic_buffer(_output), ba::transfer_exactly(_length[0]),
                                [self,this](error_code ec, size_t) { _init.completion_handler(ec, std::move(_output)); });
                    });

                return _init.result.get();
            }
        };

        return std::make_shared<op>(s, std::forward<Token>(token))->run();
    }

} // Protocol

#include <iostream>
#include <iomanip>

int main() {

    ba::io_context io;
    tcp::socket sock(io);
    sock.connect({tcp::v4(), 6767});

    auto cont = [](auto name, auto continuation = []{}) { return [=](error_code ec, size_t transferred) {
        std::cout << name << " completed (" << transferred << ", " << ec.message() << ")\n";
        if (!ec) continuation();
    }; };
    auto report = [=](auto name) { return cont(name, []{}); };

    // send chain
    std::string hello = "Hello", world = "World";
    Protocol::async_transmit(sock, ba::buffer(hello),
            cont("Send hello", [&] { Protocol::async_transmit(sock, ba::buffer(world), report("Send world")); }
        ));
#ifndef DEMO_USE_FUTURE
    // receive chain
    std::string object1, object2;
    Protocol::async_receive(sock, object1,
            cont("Read object 1", [&] { Protocol::async_receive(sock, object2, report("Read object 2")); }));

    io.run();

    std::cout << "Response object 1: " << std::quoted(object1) << "\n";
    std::cout << "Response object 2: " << std::quoted(object2) << "\n";
#else
    // also possible, alternative completion mechanisms:
    std::future<std::string> fut = Protocol::async_receive(sock, ba::use_future);
    io.run();

    std::cout << "Response object: " << std::quoted(fut.get()) << "\n";
#endif

}

When talking to a test server like:

xxd -p -r <<< '0000 0006 4e6f 2077 6179 0000 0005 4a6f 73c3 a90a' | netcat -l -p 6767 | xxd

The program prints

Send hello completed (9, Success)
Send world completed (9, Success)
Read object 1 completed (6, Success)
Read object 2 completed (5, Success)
Response object 1: "No way"
Response object 2: "José"

And the netcat side prints:

00000000: 0000 0005 4865 6c6c 6f00 0000 0557 6f72  ....Hello....Wor
00000010: 6c64                                     ld

Enabling handler tracking allows you to use handlerviz.pl to visualize the call chains: enter image description here

Note You can change big_int32_t to little_int32_t without any further change. Of course, you should change the payload on the server side to match:

xxd -p -r <<< '0600 0000 4e6f 2077 6179 0500 0000 4a6f 73c3 a90a' | netcat -l -p 6767 | xxd

¹ Endianness, e.g. using Boost Endian or ::ntohs, ::ntohl, ::htons and ::htonl

Upvotes: 2

Related Questions