Reputation: 8209
I'm trying to implement protobuf sending/receiving that's compatible with the java version, which includes a varint32-prefix first.
I've almost got it working, but for some reason some messages become partial and fail an assert().
/receiver.cpp:69: void tcp_connection::handle_read_message(const boost::system::error_code&, size_t): Assertion `line.ParseFromCodedStream(&input)' failed.
semder.pp
boost::asio::streambuf buffer;
std::ostream writer(&buffer);
bool packet_full = false;
uint32_t sent_lines = 0;
{ //new scope for protobuf streams, these flush in dtor
google::protobuf::io::OstreamOutputStream osostream(&writer);
google::protobuf::io::CodedOutputStream output(&osostream);
std::string lines;
while(std::getline(reader, line)) {
lines += line + "\n";
++sent_lines;
if(sent_lines > 100) {
packet_full = true;
break;
}
}
if(!lines.empty()) {
msg->set_text(lines);
const uint32_t size = msg->ByteSize();
output.WriteVarint32(size);
uint8_t* buffer = output.GetDirectBufferForNBytesAndAdvance(size);
if(buffer != 0) {
msg->SerializeWithCachedSizesToArray(buffer);
} else {
msg->SerializeWithCachedSizes(&output);
}
}
if(sent_lines > 0) {
sock.send(buffer.data());
if(!packet_full && !reader.eof()) { //Read ended, and not due to end of file
std::cout << "An error occured" << std::endl;
break;
}
reader.clear(); //clear EOF flag
}
receiver.cpp
It's a boost asio callback.
Member variables:
boost::asio::ip::tcp::socket socket_;
boost::asio::streambuf buffer_;
Code
void handle_read_message(const boost::system::error_code& error,
size_t bytes_transferred) {
if(!error) {
buffer_.commit(bytes_transferred);
std::istream reader(&buffer_);
google::protobuf::io::IstreamInputStream isistream(&reader);
google::protobuf::io::CodedInputStream input(&isistream);
uint32_t size = 0;
assert(input.ReadVarint32(&size));
auto limit = input.PushLimit(size);
msgs::Line line;
assert(line.ParseFromCodedStream(&input));
assert(input.ConsumedEntireMessage());
input.PopLimit(limit);
start();
} else {
std::cout <<"error during handle_read_message: " << error << std::endl;
}
}
This is mainly based on https://stackoverflow.com/a/22899712
EDIT: New receiver version, reader_ is now a member variable:
void handle_read_message(const boost::system::error_code& error,
size_t bytes_transferred) {
std::cout << "handle_read_message(" << bytes_transferred << ")" <<std::endl;
if(!error) {
buffer_.commit(bytes_transferred);
uint32_t size = 0;
google::protobuf::io::IstreamInputStream isistream_(&reader_);
{
google::protobuf::io::CodedInputStream input(&isistream_);
if(!input.ReadVarint32(&size)) {
std::cout << "Failed to read size, waiting for more data" << std::endl;
start();
return;
}
}
std::size_t varint_size = isistream_.ByteCount();
std::cout <<"varintsize: " << varint_size << ", size: " << size << ", have bytes: " << buffer_.size() << std::endl;
if(varint_size + size > buffer_.size()) {
std::cout << "Not enough data received, waiting for more" << std::endl;
start();
return;
}
google::protobuf::io::CodedInputStream input(&isistream_);
auto limit = input.PushLimit(size);
msgs::Line line;
assert(line.ParseFromCodedStream(&input));
std::cout << line.text() << std::endl;
assert(input.ConsumedEntireMessage());
input.PopLimit(limit);
start();
} else {
std::cout <<"error during handle_read_message: " << error << std::endl;
}
}
Upvotes: 3
Views: 6504
Reputation: 45171
If you are using async I/O on the receiving end, you will need to ensure that you have really received the entire message before you start parsing. Remember that a TCP connection is a stream. The async callback runs any time there is data available -- even if it is incomplete. You may get only a partial message, or you may get a whole message plus some of the next message. This is why readDelimitedFrom()
is needed in the first place: to figure out exactly how many bytes you need to wait for before parsing.
So, when using async I/O, you'll need to code things differently. You could use a strategy like this:
ZeroCopyInputStream
and CodedInputStream
.ReadVarint32()
to read the size. If ReadVarint32 fails, then you haven't received the whole size yet, so stop and wait for more bytes.ReadVarint32()
succeeds, destroy the CodedInputStream
and then call ByteCount()
on the underlying ZeroCopyInputStream
to find out how many bytes were consumed by the varint.(Also: There seems to be a missing closing brace in your sender.cpp code. If the original file has the same error, it may be that you're sending data before CodedOutputStream has flushed. But I'm guessing the error is not in the original.)
Upvotes: 6