davidA
davidA

Reputation: 13694

Writing sections of data to TCP socket with boost::asio::ip::tcp from streambuf

Note for those down-voting: this question is not about the async aspects of asio (although perhaps an async solution might make sense here, which is a question I do offer up at the end). This is actually just about the use of streambuf and ostreams with the asio tcp socket wrapper. The examples/tutorial do not cover this particular aspect (sub-dividing a write call).

I'm writing some (hopefully simple) code for a plugin environment that needs to send a fairly large chunk of data (~2MB) to an external server in response to some event. The data needs to be sent fairly promptly, and completely, but it's infrequent and I'm not overly concerned about raw performance. I'm using Google's Protocol Buffers to serialize the data.

As of right now, I have the following code that almost works:

#include <boost/asio.hpp>

// connect to the server:
boost::asio::io_service io_service;
tcp::resolver resolver(io_service);
tcp::resolver::query query(server_address, server_port);
tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
tcp::socket socket(io_service);
boost::asio::connect(socket, endpoint_iterator);

// float_array consists of ~500,000 floats in a ProtoBuf message:
//
// message FloatArray {
//   repeated float data = 1 [packed=true];
// }

// send the serialized float_array to the server:
boost::asio::streambuf b;
std::ostream os(&b);
float_array.SerializeToOstream(&os);
boost::asio::write(socket, b);

// the TCP connection *must* now close to signal the server

The problem with this is that the environment I'm working within (which is multithreaded) sees the write() operation as taking too long (it blocks), and terminates the thread. As I am not permitted to create my own threads, I need to split up the write() operation into a number of separate writes.

My concern is how to do this. I know that I can use this to send an exact number of bytes:

boost::asio::write(socket, b, boost::asio::transfer_exactly(65536));

But to this correctly I need to know exactly how many bytes remain in the ostream. I notice that b.size() reduces accordingly so I could use this. However in order to split my writes up, I need to store some state between calls to this function.

One option I have is to store both the streambuf b and the ostream os between calls to my writing function, but I'm wondering if there is a better way to do this. As far as I can tell it's not possible to partially serialize the ProtoBuf output, so I think I'm stuck with the one call to float_array.SerializeToOstream(). The question then is whether there's a proper way to query the ostream directly for the number of available bytes, or possibly to make use of some other mechanism (boost::asio::buffer maybe?).

I'm happy to review the boost::asio documentation myself, I'm just looking for a little guidance on how to proceed, please, as there's a lot of documentation to work through and I'm not sure which parts of the puzzle are relevant.

A thought - is it possible to use boost::asio to create some sort of single-threaded asynchronous "sender" along these lines that can handle this sort of state for me? For example, can I call some sort of non-blocking write() function and then have some sort of callback (or frequently visited function) that checks for completion and then closes the TCP connection?

Upvotes: 0

Views: 2099

Answers (1)

davidA
davidA

Reputation: 13694

Although I wasn't able to find a way to query an ostream to determine the amount of data 'waiting' in the stream directly, I was able to avoid ostreams entirely and serialize the data into a char* array. This can then be sent in a similar way to the old-style C socket method with the boost::asio::write() function:

...
tcp::socket socket(io_service);
char * buffer = new char[size];  // or a smart-ptr
float_array.SerializeToArray(static_cast<void*>(buffer, size));
void * p = static_cast<void*>(buffer);
int bytes_sent = boost::asio::write(socket, boost::asio::buffer(p, bytes_to_send);

Alternatively, if the use of boost::asio::streambuf and std::ostream are preferred, then it appears that the streambuf can be queried (with .size()) after the ostream has been used to write some data:

boost::asio::streambuf b;
std::ostream os(&b);
float_array.SerializeToOstream(&os);

// send a chunk of a particular size
int bytes_to_send = std::min(chunk_size, b.size());
cout << b.size() << endl;  // shows amount of remaining data
boost::asio::write(socket, b, boost::asio::transfer_exactly(bytes_to_send));
cout << b.size() << endl;  // shows a reduction in amount of remaining data

So if this is invoked multiple times (for each chunk), then the ostream, the streambuf and the io_service need to be kept in scope.

Upvotes: 2

Related Questions