Daszek
Daszek

Reputation: 177

Using custom streambuf with boost::asio async operations

I'm writing a server application using boost::asio.
I am reading known amount of data from user and write it to a data sink which has following api:

class Sink {
    ...
    void write(const char *data, size_t size);
}

The data is big and it is possible to call write(..) multiple times to process one stream.
In my code I would like to call:

boost::asio::async_read(socket, sink_buffer,
    boost::asio::transfer_exactly(size), ...);

Is it possible to wrap Sink with custom std::streambuf or boost::asio::basic_streambuf so it could handle writing data parts to it?

Upvotes: 3

Views: 1915

Answers (1)

Tanner Sansbury
Tanner Sansbury

Reputation: 51871

For objects being passed as the buffers argument for async_read(), the buffers argument either :

  • needs to be a type that meets the MutableBufferSequence requirement.
  • be a boost::asio::basic_streambuf object.

Thus, it is possible to write a custom class that interacts with a Sink object when reading occurs. However, the boost::asio::basic_streambuf does not appear to be designed to serve as a base class.

If Sink::write is only an abstraction to the underlying memory, consider using an approach similar to basic_streambuf::prepare(), where a member function returns a handle to a buffer for a given size. The underlying memory implementation would still remain abstracted behind the mutable_buffer. For example:

boost::asio::async_read( socket, sink.buffer( size ), ... );

If Sink::write has business logic, such as performing logic branching based on the value of certain bytes, then it may be required to pass an intermediate buffer to async_read(). The invocation of Sink::write() with the intermediate buffer would then be done from within async_read()'s handler. For example:

void handle_read_into_sink( boost::system::error_code error,
                            std::size_t bytes_transferred,
                            boost::asio::ip::tcp::socket& socket,
                            Sink& sink,
                            char* buffer,
                            std::size_t buffer_size,
                            std::size_t bytes_remaining,
                            boost::function< void() > on_finish )
{
  sink.write( buffer, buffer_size );

  bytes_remaining -= bytes_transferred;
  // If there are more bytes remaining, then continue reading.
  if ( bytes_remaining )
  {
    read_into_sink( socket, sink, buffer, buffer_size,
                    bytes_remaining, on_finish );
  }
  // Otherwise, all data has been read.
  else
  {
    on_finish();
  }  
}

void read_into_sink( boost::asio::ip::tcp::socket& socket,
                     Sink& sink,
                     char* buffer,
                     std::size_t buffer_size,
                     std::size_t bytes_remaining,
                     boost::function< void() > on_finish )
{
  boost::asio::async_read(
    socket, boost::asio::buffer( buffer , buffer_size ),
    boost::asio::transfer_exactly( buffer_size ),
    boost::bind( handle_read_into_sink,
                 boost::asio::placeholders::error,
                 boost::asio::placeholders::bytes_transferred,
                 boost::ref( socket ),
                 boost::ref( sink ),
                 buffer,
                 buffer_size,
                 bytes_remaining,
                 on_finish ) );
}

And begin the async read loop with:

read_into_sink( socket, sink, small_buffer, sizeof_small_buffer, 
                total_stream_size, read_handler_callback );

Make sure to check and handle error based on your desired logic.

Upvotes: 1

Related Questions