Reputation: 619
I've been struggling with this for two days straight with my poor knowledge with C++. What I need to do is parsing sequences of messages using protobuf C++ API from a big file, a file that may contain millions of such messages. Reading straight from the file is easy, as I can always just do "ReadVarInt32" to get the size and then do ParseFromCodedStream with the limit pushed on CodedInputStream, as described in this post. However, the I/O level API I am working with (libuv actually) requires a fixed sized of buffer being allocated for every read callback action. Apparently that block size has nothing to do with the message size I am reading out.
This makes my life hard. Basically everytime I read from the file and fill in the fixed-sized buffer (say 16K), that buffer would probably contain hundreds of complete protobuf messages, but the last chunk of that buffer would likely be an incomplete message. So I thought, okay what I should do is attempt reading as many messages as I can, and at the end, extract the last chunk and attach it to the beginning of the next 16K buffer I read out, keep going until I reach EOF of the file. I use ReadVarInt32() to get the size, and then compare that number with the rest of the buffer size, if the message size is smaller, keeps reading.
There is this API called GetDirectBufferPointer, so that I attempt to use this to record the pointer position before I even read out the next message's size. However I suspect due to endianness weirdness, if I just extract the rest of the byte array from where pointer starts and attaches to the next chunk, Parse won't succeed and in fact the first several bytes (8 I think) just completely messed up.
Alternatively, if I do codedStream.ReadRaw() and writes the residual stream into the buffer and then attaches to the head of the new chunk, the data won't get corrupted. But the problem is this time I will lose the "size" byte information as it has already been "read" in "ReadVarInt32"! And even if I just go ahead and remember the size information I read last time and directly call in next iteration message.ParseFromCodedStream(), it ended up reading one less byte, and some part even got corrupted and cannot restore the object successfully.
std::vector<char> mCheckBuffer;
std::vector<char> mResidueBuffer;
char bResidueBuffer[READ_BUFFER_SIZE];
char temp[READ_BUFFER_SIZE];
google::protobuf::uint32 size;
//"in" is the file input stream
while (in.good()) {
in.read(mReadBuffer.data(), READ_BUFFER_SIZE);
mCheckBuffer.clear();
//merge the last remaining chunk that contains incomplete message with
//the new data chunk I got out from buffer. Excuse my terrible C++ foo
std::merge(mResidueBuffer.begin(), mResidueBuffer.end(),
mReadBuffer.begin(), mReadBuffer.end(), std::back_inserter(mCheckBuffer));
//Treat the new merged buffer array as the new CIS
google::protobuf::io::ArrayInputStream ais(&mCheckBuffer[0],
mCheckBuffer.size());
google::protobuf::io::CodedInputStream cis(&ais);
//Record the pointer location on CIS in bResidueBuffer
cis.GetDirectBufferPointer((const void**)&bResidueBuffer,
&bResidueBufSize);
//No size information, probably first time or last iteration
//coincidentally read a complete message out. Otherwise I simply
//skip reading size again as I've already populated that from last
//iteration when I got an incomplete message
if(size == 0) {
cis.ReadVarint32(&size);
}
//Have to read this again to get remaining buffer size
cis.GetDirectBufferPointer((const void**)&temp, &mResidueBufSize);
//Compare the next message size with how much left in the buffer, if
//message size is smaller, I know I can read at least one more message
//out, keep reading until I run out of buffer, or, it's the end of message
//and my buffer just allocated larger so size should be 0
while (size <= mResidueBufSize && size != 0) {
//If this cis I constructed didn't have the size info at the beginning,
//and I just read straight from it hoping to get the message out from
//the "size" I got from last iteration, it simply doesn't work
//(read one less byte in fact, and some part of the message corrupted)
//push the size constraint to the input stream;
int limit = cis.PushLimit(size);
//parse message from the input stream
message.ParseFromCodedStream(&cis);
cis.PopLimit(limit);
google::protobuf::TextFormat::PrintToString(message, &str);
printf("%s", str.c_str());
//do something with the parsed object
//Now I have to record the new pointer location again
cis.GetDirectBufferPointer((const void**)&bResidueBuffer,
&bResidueBufSize);
//Read another time the next message's size and go back to while loop check
cis.ReadVarint32(&size);
}
//If I do the next line, bResidueBuffer will have the correct CIS information
//copied over, but not having the "already read" size info
cis.ReadRaw(bResidueBuffer, bResidueBufSize);
mResidueBuffer.clear();
//I am constructing a new vector that receives the residual chunk of the
//current buffer that isn't enough to restore a message
//If I don't do ReadRaw, this copy completely messes up at least the first 8
//bytes of the copied buffer's value, due to I suspect endianness
mResidueBuffer.insert(mResidueBuffer.end(), &bResidueBuffer[0],
&bResidueBuffer[bResidueBufSize]);
}
I'm really out of idea now. Is it even possible to gracefully use protobuf with APIs that requires fixed-sized intermediate buffer at all? Any inputs very much appreciated, thanks!
Upvotes: 1
Views: 3089
Reputation: 619
Okay thanks to Kenton's help in pointing the major issues in my question, I have now revised the code piece and tested it working. I will post my solution down here. With that said, however, I am not feeling happy about all the complexity and edge case checkings I needed to do here. I think it's error prone. Even with this, what I will probably do for real is writing my direct "read from stream" blocking call in another thread outside of my libuv main thread so I don't get the requirement of having to use libuv API. But for the sake of completeness, here's my code:
std::vector<char> mCheckBuffer;
std::vector<char> mResidueBuffer;
std::vector<char> mReadBuffer(READ_BUFFER_SIZE);
google::protobuf::uint32 size;
//"in" is the file input stream
while (in.good()) {
//This part is tricky as you're not guaranteed that what end up in
//mReadBuffer is everything you read out from the file. The same
//happens with libuv's assigned buffer, after EOF, what's rest in
//the buffer could be anything
in.read(mReadBuffer.data(), READ_BUFFER_SIZE);
//merge the last remaining chunk that contains incomplete message with
//the new data chunk I got out from buffer. I couldn't find a more
//efficient way doing that
mCheckBuffer.clear();
mCheckBuffer.reserve(mResidueBuffer.size() + mReadBuffer.size());
mCheckBuffer.insert(mCheckBuffer.end(), mResidueBuffer.begin(),
mResidueBuffer.end());
mCheckBuffer.insert(mCheckBuffer.end(), mReadBuffer.begin(),
mReadBuffer.end());
//Treat the new merged buffer array as the new CIS
google::protobuf::io::ArrayInputStream ais(&mCheckBuffer[0],
mCheckBuffer.size());
google::protobuf::io::CodedInputStream cis(&ais);
//No size information, probably first time or last iteration
//coincidentally read a complete message out. Otherwise I simply
//skip reading size again as I've already populated that from last
//iteration when I got an incomplete message
if(size == 0) {
cis.ReadVarint32(&size);
}
bResidueBufSize = mCheckBuffer.size() - cis.CurrentPosition();
//Compare the next message size with how much left in the buffer, if
//message size is smaller, I know I can read at least one more message
//out, keep reading until I run out of buffer. If, it's the end of message
//and size (next byte I read from stream) happens to be 0, that
//will trip me up, cos when I push size 0 into PushLimit and then try
//parsing, it will actually return true even if it reads nothing.
//So I can get into an infinite loop, if I don't do the check here
while (size <= bResidueBufSize && size != 0) {
//If this cis I constructed didn't have the size info at the
//beginning, and I just read straight from it hoping to get the
//message out from the "size" I got from last iteration
//push the size constraint to the input stream
int limit = cis.PushLimit(size);
//parse the message from the input stream
bool result = message.ParseFromCodedStream(&cis);
//Parse fail, it could be because last iteration already took care
//of the last message and that size I read last time is just junk
//I choose to only check EOF here when result is not true, (which
//leads me to having to check for size=0 case above), cos it will
//be too many checks if I check it everytime I finish reading a
//message out
if(!result) {
if(in.eof()) {
log.info("Reached EOF, stop processing!");
break;
}
else {
log.error("Read error or input mal-formatted! Log error!");
exit;
}
}
cis.PopLimit(limit);
google::protobuf::TextFormat::PrintToString(message, &str);
//Do something with the message
//This is when the last message read out exactly reach the end of
//the buffer and there is no size information available on the
//stream any more, in which case size will need to be reset to zero
//so that the beginning of next iteration will read size info first
if(!cis.ReadVarint32(&size)) {
size = 0;
}
bResidueBufSize = mCheckBuffer.size() - cis.CurrentPosition();
}
if(in.eof()) {
break;
}
//Now I am copying the residual buffer into the intermediate
//mResidueBuffer, which will be merged with newly read data in next iteration
mResidueBuffer.clear();
mResidueBuffer.reserve(bResidueBufSize);
mResidueBuffer.insert(mResidueBuffer.end(),
&mCheckBuffer[cis.CurrentPosition()],&mCheckBuffer[mCheckBuffer.size()]);
}
if(!in.eof()) {
log.error("Something else other than EOF happened to the file, log error!");
exit;
}
Upvotes: 0
Reputation: 45296
I see two major problems with your code:
std::merge(mResidueBuffer.begin(), mResidueBuffer.end(),
mReadBuffer.begin(), mReadBuffer.end(), std::back_inserter(mCheckBuffer));
It looks like you are expecting std::merge
to concatenate your buffers, but in fact this function performs a merge of two sorted arrays into a single sorted array in the sense of MergeSort. This doesn't make any sense in this context; mCheckBuffer will end up containing nonsense.
cis.GetDirectBufferPointer((const void**)&bResidueBuffer,
&bResidueBufSize);
Here you are casting &bResidueBuffer
to an incompatible pointer type. bResidueBuffer
is a char array, so &bResidueBuffer
is a pointer to a char array, which is not a pointer to a pointer. This is admittedly confusing because arrays can be implicitly converted to pointers (where the pointer points to the first element of the array), but this is actually a conversion -- bResidueBuffer
is itself not a pointer, it can just be converted to one.
I think you're also misunderstanding what GetDirectBufferPointer()
does. It looks like you want it to copy the rest of the buffer into bResidueBuffer
, but the method never copies any data. The method gives you back a pointer that points into the original buffer.
The correct way to call it is something like:
const void* ptr;
int size;
cis.GetDirectBufferPointer(&ptr, &size);
Now ptr
will point into the original buffer. You could now compare this against a pointer to the beginning of the buffer to find out where you are in the stream, like:
size_t pos = (const char*)ptr - &mCheckBuffer[0];
But, you shouldn't do that, because CodedInputStream
already has the method CurrentPosition()
for exactly this purpose. That will return the current byte offset in the buffer. So, use that instead.
Upvotes: 1