Takatoshi Kondo
Takatoshi Kondo

Reputation: 3550

How to decompress the data in boost::asio::streambuf using boost::iostreams:filtering_istream?

I'm trying to decompress zlib compressed data that are received from boost asio socket. (version 1.64.0) I use boost::asio::streambuf as the receive buffer and boost::iostreams::filtering_istream to decompress. If I send a compressed data all at once, my code works as I expected. However, if I send a compressed data as 5 byte chunked, the code doen't work as I expected.

I create a compressed data as follows:

auto str = prepare_compressed_data("Hello world");

The data are:

78 9c f3 48 cd c9 c9 57 28 cf 2f ca 49 01 00 18 ab 04 3d
(Length is 19)

I set up the filtering_istream to decompress as follows:

// Prepare decompressing istream
boost::asio::streambuf sbuf;
boost::iostreams::filtering_istream is;
is.push(boost::iostreams::zlib_decompressor());
is.push(sbuf);

Here is the network send-receive emulation code:

// Network (asio socket send-receive) emulation
auto str_it = str.begin();
auto rest = str.size();
while (rest != 0) {
    auto copy_size = std::min(rest, send_size); // send_size is 5

    // Emulate receive
    //   In actual code, it is `socket.read_some(mbuf)`
    //   and its return value is `copy_size`.

    auto mbuf = sbuf.prepare(copy_size);
    char* p = boost::asio::buffer_cast<char*>(mbuf);
    std::copy(str_it, str_it + copy_size, p);
    sbuf.commit(copy_size);
    hexdump(std::string(boost::asio::buffer_cast<char const*>(sbuf.data()), sbuf.size()));
    std::cout << "sbuf.size():" << sbuf.size() << std::endl;
    { // decompress
        std::cout << "<<< try decompress >>>" << std::endl;
        while (is) {
            std::cout << "  `is` has some data." << std::endl;
            char buf[buf_size + 1] = { '\0' };
            is.read(buf, buf_size);
            std::size_t read_size = is.gcount();
            std::cout << "  read_size:" << read_size << std::endl;
            std::cout << "  decompressed data: " << buf << std::endl;

            // It seems that is.read() consumed 5 bytes data in underlying sbuf
            // even if is.gcount() returned 0.
        }
        std::cout << "<<< decompress loop out >>>" << std::endl;
    }
    rest -= copy_size;
    str_it += copy_size;
}

Copy a part of str (5 bytes chunked) to sbuf. The first execution of inner while loop,while (is) {, is.read(buf, buf_size) is executed, and the next line is.gcount() returns 0. I guess that the data are not enough to decompress. However, is seems to consume 5 bytes from sbuf. Then the next 5 bytes are copied, the condition of inner while loop is not satisfied.

As I missing something?

The complete code is here: https://gist.github.com/redboltz/1934952be51e73d558fe8cd7e861d4da

I got the following outputs:

compressed data: 78 9c f3 48 cd c9 c9 57 28 cf 2f ca 49 01 00 18 ab 04 3d 
78 9c f3 48 cd 
sbuf.size():5
<<< try decompress >>>
  `is` has some data.
  read_size:0
  decompressed data: 
<<< decompress loop out >>>
c9 c9 57 28 cf 
sbuf.size():5
<<< try decompress >>>
<<< decompress loop out >>>
c9 c9 57 28 cf 2f ca 49 01 00 
sbuf.size():10
<<< try decompress >>>
<<< decompress loop out >>>
c9 c9 57 28 cf 2f ca 49 01 00 18 ab 04 3d 
sbuf.size():14
<<< try decompress >>>
<<< decompress loop out >>>

If I copy str all at once, I got the following result as I expected.

compressed data: 78 9c f3 48 cd c9 c9 57 28 cf 2f ca 49 01 00 18 ab 04 3d 
78 9c f3 48 cd c9 c9 57 28 cf 2f ca 49 01 00 18 ab 04 3d 
sbuf.size():19
<<< try decompress >>>
  `is` has some data.
  read_size:11
  decompressed data: Hello world
<<< decompress loop out >>>

Upvotes: 2

Views: 701

Answers (1)

sehe
sehe

Reputation: 392911

When you read on incomplete input, the filtering stream will signal EOF, and stay signaled.

The safest way to work around it is to tentatively copy the sbuf data, and only consume() the data on successful decompression:

Live On Wandbox

#include <sstream>
#include <string>
#include <iostream>
#include <iomanip>
#include <algorithm>

#include <boost/iostreams/filtering_stream.hpp>
#include <boost/iostreams/filtering_streambuf.hpp>
#include <boost/iostreams/copy.hpp>
#include <boost/iostreams/filter/zlib.hpp>

#include <boost/asio.hpp>

void hexdump(std::string const& buf) {
    for (std::string::const_iterator it = buf.begin(), end = buf.end();
         it != end;
         ++it) {
        std::cout
            << std::setw(2)
            << std::hex
            << std::setfill('0')
            << (static_cast<int>(*it) & 0xff)
            << ' ';
    }
    std::cout << std::dec << std::endl;
}

std::string prepare_compressed_data(std::string const& str) {
    std::stringstream sender;
    boost::iostreams::filtering_streambuf<boost::iostreams::input> out;
    out.push(boost::iostreams::zlib_compressor());
    out.push(sender);
    sender << str << std::flush;
    std::stringstream compressed;
    boost::iostreams::copy(out, compressed);
    return compressed.str();
}

int main() {
    auto str = prepare_compressed_data("Hello world");
    std::cout << "compressed data: ";
    hexdump(str);

    // Test settings
#if 0
    // send all at conce
    std::size_t const send_size = str.size();
#else
    // send 5 byte chunks
    std::size_t const send_size = 5;
#endif
    std::size_t const buf_size = 256;

    // Prepare decompressing istream
    boost::asio::streambuf sbuf;

    // Network (asio socket send-receive) emulation
    auto str_it = str.begin();
    auto rest = str.size();
    while (rest != 0) {
        auto copy_size = std::min(rest, send_size);

        // Emulate receive
        //   In actual code, it is `socket.read_some(mbuf)`
        //   and its return value is `copy_size`.

        auto mbuf = sbuf.prepare(copy_size);
        char* p = boost::asio::buffer_cast<char*>(mbuf);
        std::copy(str_it, str_it + copy_size, p);
        sbuf.commit(copy_size);

        hexdump(std::string(boost::asio::buffer_cast<char const*>(sbuf.data()), sbuf.size()));
        std::cout << "sbuf.size():" << sbuf.size() << std::endl;
        { // decompress
            std::cout << "<<< try decompress >>>" << std::endl;
            char buf[buf_size] = {};
            std::size_t read_size = 0;

            boost::iostreams::filtering_istream is;
            is.push(boost::iostreams::zlib_decompressor());
            std::stringstream ss(std::string(boost::asio::buffer_cast<char const*>(sbuf.data()), sbuf.size()));
            is.push(ss);

            while (is.read(buf, buf_size) || (read_size = is.gcount())) {
                std::cout << "  `is` has some data." << std::endl;
                std::cout << "  read_size:" << read_size << std::endl;
                (std::cout << "  decompressed data: ").write(buf, read_size) << std::endl;

                // It seems that is.read() consumed 5 bytes data in underlying sbuf
                // even if is.gcount() returned 0.
            }
            std::cout << "<<< decompress loop out >>>" << std::endl;
        }
        rest -= copy_size;
        str_it += copy_size;
    }
}

Prints

Start
compressed data: 78 9c f3 48 cd c9 c9 57 28 cf 2f ca 49 01 00 18 ab 04 3d 
78 9c f3 48 cd 
sbuf.size():5
<<< try decompress >>>
<<< decompress loop out >>>
78 9c f3 48 cd c9 c9 57 28 cf 
sbuf.size():10
<<< try decompress >>>
<<< decompress loop out >>>
78 9c f3 48 cd c9 c9 57 28 cf 2f ca 49 01 00 
sbuf.size():15
<<< try decompress >>>
<<< decompress loop out >>>
78 9c f3 48 cd c9 c9 57 28 cf 2f ca 49 01 00 18 ab 04 3d 
sbuf.size():19
<<< try decompress >>>
  `is` has some data.
  read_size:11
  decompressed data: Hello world
<<< decompress loop out >>>

Alternatively, you can "hack it" by clearing the stream flags, but I fear the results are not documented (i.e. unreliable at best)

Upvotes: 1

Related Questions