YesButWhy
YesButWhy

Reputation: 73

How to access the contents of the boost::asio::streambuf in the async_write() handler

I need to print the contents of the boost::asio::streambuf in the async_write() handler into the log after it was sent with the same async_write(). But although streambuf::size() returns 95 before async_write(), it will return 0 in the async_write() handler, while containing the exact sent data. That makes logging impossible, as we don't know the buffer size (how many symbols to log).

As far as I understand, the problem is that after async_write() has been executed, the internal pointers of the streambuf are changed because of the "write" operation, and the data in the buffer is "invalidated" after been sent. That's why, despite the fact that streambuf::size() returns 95 before async_write(), it will return 0 in the async_write() handler.

I also noticed that the buffer still contains the needed content in the async_write() handler. One could suggest saving the size of the buffer before sending and reuse it in the handler. Still, I assume I cannot rely on the fact that it will always be available in the handler, as far as the streambuf implementation may delete the content if it thinks it would be necessary. Intuitively such approach feels unreliable.

Are there any workarounds to safely print the buffer content into the log in async_write() handler?

// Message class.
struct MyTcpMsg {
...
public:
  // Buffer "getter".
  boost::asio::streambuf& buffer();

private:
  // Buffer that keeps the data that is going to be sent over the network.
  boost::asio::streambuf m_buffer;
};

// Message queue. 
std::deque<std::unique_ptr<MyTcpMsg>> messagesOut;
//.. fill messagesOut queue with messages... (code omitted)

// Code that executes sending the message. 
// Attempting to log the sent data in the handler.
auto& msgStream = m_messagesOut.front()->buffer();
// msgStream.size() is 95 here.
boost::asio::async_write(
    getSocket(), msgStream,
    [this](boost::system::error_code ec, std::size_t length) {
      if (ec == 0) {
        auto& msgStreamOut = m_messagesOut.front()->buffer();;
        // Log important info. But the buffer size
        // in (msgStreamOut.size()) is 0 now, which makes logging impossible,
        // although the data is still present in the buffer. 
        printBufferToLog(msgStreamOut, msgStreamOut.size());
      }
    });

Thanks in advance

Upvotes: 1

Views: 239

Answers (1)

sehe
sehe

Reputation: 392893

Yeah. You correctly understood the way DynamicBuffer operates. If you don't want that, use a non-dynamic buffer or sequence of buffers.

The good news is that you can get a buffer sequence instance from the streambuf in no effort at all:

auto& sb = m_messagesOut.front()->buffer();
asio::const_buffers_1 buf = sb.data();

So you can update your code:

void stub_send_loop() {
    auto& sb = m_messagesOut.front()->buffer();
    asio::const_buffers_1 buf = sb.data();
    async_write(getSocket(), buf, [=, &sb](error_code ec, size_t length) {
        if (!ec) {
            // Log important info
            (std::cout << "Wrote : ").write(buffer_cast<char const*>(buf), length) << std::endl;

            // update stream
            sb.consume(length);
        }
    });
}

Side-note: The exact type of buf is a bit of an implementation detail. I recommend depending on it indirectly to make sure that the implementation of the streambufs buffer sequence is guaranteed to be a single buffer. async_write doesn't care, but your logging code might (as shown). See also is it safe to use boost::asio::streambuf as both an istream and an array as string_view?


Live On Coliru

#include <boost/asio.hpp>
#include <deque>
#include <iostream>
namespace asio = boost::asio;
using asio::ip::tcp;
using boost::system::error_code;

struct MyTcpMsg {
    asio::streambuf& buffer() { return m_buffer; }

    template <typename... T> MyTcpMsg(T const&... args) {
        (std::ostream(&m_buffer) << ... << args);
    }

  private:
    asio::streambuf m_buffer;
};

struct X {
    asio::io_context io;
    tcp::socket      sock_{io};

    std::deque<std::unique_ptr<MyTcpMsg>> m_messagesOut;

    X() {
        m_messagesOut.push_back(std::make_unique<MyTcpMsg>("Hello world: ", 21 * 2, "."));
        m_messagesOut.push_back(std::make_unique<MyTcpMsg>("Bye"));
    };

    tcp::socket& getSocket() {
        if (!sock_.is_open())
            sock_.connect({{}, 7878});

        return sock_;
    }

    void stub_send_loop() {
        auto& sb = m_messagesOut.front()->buffer();
        asio::const_buffers_1 buf = sb.data();
        async_write(getSocket(), buf, [=, &sb](error_code ec, size_t length) {
            if (!ec) {
                // Log important info
                (std::cout << "Wrote : ").write(buffer_cast<char const*>(buf), length) << std::endl;

                // update stream
                sb.consume(length);
            }
        });
    }
};

int main() {
    X x;
    x.stub_send_loop();
}

Local demo:

enter image description here

Side Note

I think you might want to rethink your design a little. Likely, the use of streambuf is a pessimization. You could "just" return a buffer sequence, which may allow you to avoid allocation. Also, the fact that you expose it by mutable reference (via a quasi-class "getter") breaks encapsulation.

Upvotes: 2

Related Questions