laurapons
laurapons

Reputation: 1103

boost::asio::async_read_until with custom match_char to accept only JSON format

I've been trying to change match_char function to accept only JSON messages when reading data from a socket.

I have 2 implementations (one does not work and the other one works but I don't think it's efficient).

1- First approach (working)

    typedef boost::asio::buffers_iterator<boost::asio::streambuf::const_buffers_type> buffer_iterator;

    static std::pair<buffer_iterator, bool> match_json2(const buffer_iterator begin,
                                                            const buffer_iterator end) {
        buffer_iterator i = begin;
        while (i != end) {
            if ((*i == ']') || (*i == '}')) {
                return std::make_pair(i, true);
            }
            *i++;
        }
        return std::make_pair(i, false);
    }

With this definition, I read in a loop and reconstruct the json. This is a working version, but if I receive a message different from a valid json, I stay in the loop, can't clear tmp_response and never recover from it...

        std::string read_buffer_string() {
            std::string response;
            bool keepReading = true;
            while (keepReading) {
                std::string tmp_response;
                async_read_until(s, ba::dynamic_buffer(tmp_response), match_json2, yc);
                if (!tmp_response.empty()) {
                    response += tmp_response;
                    if (nlohmann::json::accept(response)) {
                        keepReading = false;
                    }
                }
            }
            return response;
        }
  1. Second approach (not working). Ideally I would like something like this one (this implementation doesn't work because begin iterator doesn't always point to the start of the message - I guess some data is already been transferred to the buffer-, and therefore match_json returns invalid values.

     static std::pair<buffer_iterator, bool> match_json(const buffer_iterator begin,
                                                             const buffer_iterator end) {
         buffer_iterator i = begin;
         while (i != end) {
             if ((*i == ']') || (*i == '}')) {
                 std::string _message(begin, i);
                 std::cout << _message << std::endl;
                 if (nlohmann::json::accept(_message)) {
                     return std::make_pair(i, true);
                 }
             }
             *i++;
         }
         return std::make_pair(i, false);
     }
    

And then call it like this:

        std::string read_buffer_string() {
            std::string response;
            async_read_until(s, ba::dynamic_buffer(response), match_json, yc);
            return response;
        }

Does anybody now a more efficient way to do it? Thanks in advance! :)

Upvotes: 1

Views: 603

Answers (3)

sehe
sehe

Reputation: 393769

Of course, right after posting my other answer I remembered that Boost has accepted Boost JSON in 1.75.0.

It does stream parsing way more gracefully: https://www.boost.org/doc/libs/1_75_0/libs/json/doc/html/json/ref/boost__json__stream_parser.html#json.ref.boost__json__stream_parser.usage

It actually deals with trailing data as well!

stream_parser p;                  // construct a parser
std::size_t n;                    // number of characters used
n = p.write_some( "[1,2" );       // parse some of a JSON
assert( n == 4 );                 // all characters consumed
n = p.write_some( ",3,4] null" ); // parse the remainder of the JSON
assert( n == 6 );                 // only some characters consumed
assert( p.done() );               // we have a complete JSON
value jv = p.release();           // take ownership of the value

I would also submit that this could be a better match for a CompletionCondition: see https://www.boost.org/doc/libs/1_75_0/doc/html/boost_asio/reference/read/overload3.html

Here's an implementation that I tested with:

template <typename Buffer, typename SyncReadStream>
static size_t read_json(SyncReadStream& s, Buffer buf,
    boost::json::value& message, boost::json::parse_options options = {})
{
    boost::json::stream_parser p{{}, options};

    size_t total_parsed = 0;
    boost::asio::read(s, buf, [&](boost::system::error_code ec, size_t /*n*/) {
        size_t parsed = 0;

        for (auto& contiguous : buf.data()) {
            parsed += p.write_some(
                boost::asio::buffer_cast<char const*>(contiguous),
                contiguous.size(), ec);
        }
        buf.consume(parsed);
        total_parsed += parsed;
        return ec || p.done(); // true means done
    });

    message = p.release(); // throws if incomplete
    return total_parsed;
}

Adding a delegating overload for streambufs:

template <typename SyncReadStream, typename Alloc>
static size_t read_json(SyncReadStream& s,
    boost::asio::basic_streambuf<Alloc>& buf,
    boost::json::value& message,
    boost::json::parse_options options = {})
{
    return read_json(s, boost::asio::basic_streambuf_ref<Alloc>(buf), message, options);
}

Demo Program

This demo program adds the test-cases from earlier as well as a socket client with some benchmark statistics added. Arguments:

  • test to run the tests instead of the socket client
  • streambuf to use the streambuf overload instead of std::string dynamic buffer
  • comments to allow comments in the JSON
  • trailing_commas to allow trailing commas in the JSON
  • invalid_utf8 to allow invalid utf8 in the JSON

Live On Compiler Explorer¹

#include <boost/spirit/home/x3.hpp>
#include <boost/fusion/adapted.hpp>
#include <iomanip>
#include <iostream>
namespace x3 = boost::spirit::x3;

int main() {
    std::string const s = 
        "? 8==2 : true ! false"
        "? 9==3 : 'book' ! 'library'";

    using expression = std::string;
    using ternary = std::tuple<expression, expression, expression>;
    std::vector<ternary> parsed;

    auto expr_ = x3::lexeme [+~x3::char_("?:!")];
    auto ternary_ = "?" >> expr_ >> ":" >> expr_ >> "!" >> expr_;

    std::cout << "=== parser approach:\n";
    if (x3::phrase_parse(begin(s), end(s), *x3::seek[ ternary_ ], x3::space, parsed)) {

        for (auto [cond, e1, e2] : parsed) {
            std::cout
                << " condition " << std::quoted(cond) << "\n"
                << " true expression " << std::quoted(e1) << "\n"
                << " else expression " << std::quoted(e2) << "\n"
                << "\n";
        }
    } else {
        std::cout << "non matching" << '\n';
    }
}

With test prints:

 ----- valid test cases
Testing {}                     -> Success {}
Testing {"a":4, "b":5}         -> Success {"a":4,"b":5}
Testing []                     -> Success []
Testing [4, "b"]               -> Success [4,"b"]
 ----- incomplete test cases
Testing {                      -> (incomplete...)
Testing {"a":4, "b"            -> (incomplete...)
Testing [                      -> (incomplete...)
Testing [4, "                  -> (incomplete...)
 ----- invalid test cases
Testing }                      -> syntax error
Testing "a":4 }                -> Success "a" -- remaining `:4 }`
Testing ]                      -> syntax error
 ----- excess input test cases
Testing {}{"a":4, "b":5}       -> Success {} -- remaining `{"a":4, "b":5}`
Testing []["a", "b"]           -> Success [] -- remaining `["a", "b"]`
Testing {} bogus trailing data -> Success {} -- remaining `bogus trailing data`

With socket client some demos:

Mean packet size: 16 in 2 packets
Request: 28 bytes
Request: {"a":4,"b":"5"} bytes
Remaining data: "bye
"
took 0.000124839s, ~0.213899MiB/s

With a large (448MiB) location_history.json:

Mean packet size: 511.999 in 917791 packets
Request: 469908167 bytes
 (large request output suppressed)
took 3.30509s, ~135.59MiB/s

enter image description here


¹ linking non-header only libraries is not supported on Compiler Explorer

Upvotes: 1

Stewart
Stewart

Reputation: 5052

In my case, I am expecting to receive a file. The tranmission starts with a json header containing.

{ "filename":"example.bin", "size":"12345" }

Then the next byte contains the start of the binary file.


I think this should work:

First we create a MatchCondition which will cause async_read_until to identify how much of a transmission is json. I do this by using { and } to increment/decrement the depth and then. Then stopping when I hit depth=0:

typedef boost::asio::buffers_iterator<boost::asio::const_buffers_1> iterator;
std::pair<iterator,bool> json_end(iterator begin, iterator end)
{
    bool inJson = false;
    int depth = 0;
    for(auto it = begin; it != end; ++it)
    {
        if (*it == '{' && ++depth)
        {
            inJson = true;
        }
        else if (*it == '}' && inJson && (--depth <= 0) )
        {
            return std::make_pair(it, true);
        }
    }
    return std::make_pair(end, false);
}

Then we simply call it:

boost::asio::streambuf m_headerBuf;
...

void FileClientPut::ReadHeader()
{
    async_read_until(m_socket, m_headerBuf, json_end,
        boost::bind(
            &FileClientPut::ProcessHeader,
            shared_from_this(),
            Placeholders::error,
            Placeholders::bytes_transferred
        )
    );
}

Then we can implement the parsing of the json header and read the binary file:

void FileClientPut::ProcessHeader(ErrorCode ec, size_t /*bytes*/)
{
    if (ec)
        return;

    std::istream requestStream(&m_headerBuf);
    boost::property_tree::ptree pt;
    
    boost::property_tree::read_json(requestStream,pt);

    m_fileName = pt.get("name","defaultName");
    m_fileSize = pt.get("size", 0);

    m_outputFile.open(m_fileName, std::ios_base::binary);

    ReadFile();
}

Then we start reading the binary file:

void FileClientPut::ReadFile()
{
    m_socket.async_read_some(
        boost::asio::buffer(m_fileBuf.data(), m_fileBuf.size()),
        boost::bind(
            &FileClientPut::ProcessFile,
            shared_from_this(),
            Placeholders::error,
            Placeholders::bytes_transferred
        )
    );
}

Then we read the file:

void FileClientPut::ReadFile()
{
    m_socket.async_read_some(
        boost::asio::buffer(m_fileBuf.data(), m_fileBuf.size()),
        boost::bind(
            &FileClientPut::ProcessFile,
            shared_from_this(),
            Placeholders::error,
            Placeholders::bytes_transferred
        )
    );
}

and process the results:

void FileClientPut::ProcessFile(ErrorCode ec, size_t bytes)
{
    if (ec)
        return;

    if (bytes > 0)
    {
        m_outputFile.write(m_fileBuf.data(), static_cast<std::streamsize>(bytes));

        if (m_outputFile.tellp() >= static_cast<std::streamsize>(m_fileSize))
            return;
    }

    ReadFile();
}

Upvotes: 0

sehe
sehe

Reputation: 393769

TL/DR;

Seriously, just add framing to your wire protocol. E.g. even HTTP responses do this (e.g. via the content length headers, and maybe chunked encoding)

UPDATE:

Instead of handrolling you can go with Boost JSON as I added in another answer


The first approach is flawed, because you are using "async_read_until" yet treat the operation as if it were synchronous.

The second problem is, neither json::parse nor json::accept can report the location of a complete/broken parse. This means that you really do need framing in your wire protocol, because you CANNOT detect message boundaries.

The rest of this answer will first dive in to expose how the limitations of the nlohmann::json library make your task impossible¹.

So even though it's commendable for you to use an existing library, we look for alternatives.

Making It Work(?)

You could use the approach that Beast uses (http::read(s, buf, http::message<>). That is: have a reference to the entire buffer.

flat_buffer buf;
http::request<http::empty_body> m;
read(s, buf, m); // is a SyncStream like socket

Here, read is a composed operation over the message as well as the buffer. This makes it easy to check the completion criteria. In our case, let's make a reader that also serves as a match-condition:

template <typename DynamicBuffer_v1>
struct JsonReader {
    DynamicBuffer_v1 _buf;
    nlohmann::json message;

    JsonReader(DynamicBuffer_v1 buf) : _buf(buf) {}

    template <typename It>
    auto operator()(It dummy, It) {
        using namespace nlohmann;

        auto f = buffers_begin(_buf.data());
        auto l = buffers_end(_buf.data());
        bool ok = json::accept(f, l);
        if (ok) {
            auto n = [&] {
                std::istringstream iss(std::string(f, l));
                message = json::parse(iss);
                return iss.tellg(); // detect consumed
            }();

            _buf.consume(n);
            assert(n);
            std::advance(dummy, n);
            return std::pair(dummy, ok);
        } else {
            return std::pair(dummy, ok);
        }
    }
};

namespace boost::asio {
    template <typename T>
    struct is_match_condition<JsonReader<T>> : public boost::true_type { };
}

This is peachy and works on the happy path. But you run into big trouble on edge/error cases:

  • you can't distinguish incomplete data from invalid data, so you MUST assume that unaccepted input is just incomplete (otherwise you would never wait for data to be complete)
  • you will wait until infinity for data to become "valid" if it's just invalid or
  • worse still: keep reading indefinitely, possibly running out of memory (unless you limit the buffer size; this could lead to a DoS)
  • perhaps worst of all, if you read more data than the single JSON message (which you can not in general prevent in the context of stream sockets), the original message will be rejected due to "excess input". Oops

Testing It

Here's the test cases that confirm the analysis conclusios predicted:

Live On Compiler Explorer

#include <boost/asio.hpp>
#include <nlohmann/json.hpp>
#include <iostream>
#include <iomanip>

template <typename Buffer>
struct JsonReader {
    static_assert(boost::asio::is_dynamic_buffer_v1<Buffer>::value);
    Buffer _buf;
    nlohmann::json message;

    JsonReader() = default;
    JsonReader(Buffer buf) : _buf(buf) {}

    template <typename It>
    auto operator()(It dummy, It) {
        using namespace nlohmann;

        auto f = buffers_begin(_buf.data());
        auto l = buffers_end(_buf.data());
        bool ok = json::accept(f, l);
        if (ok) {
            auto n = [&] {
                std::istringstream iss(std::string(f, l));
                message = json::parse(iss);
                return iss.tellg(); // detect consumed
            }();

            _buf.consume(n);
            assert(n);
            //std::advance(dummy, n);
            return std::pair(dummy, ok);
        } else {
            return std::pair(dummy, ok);
        }
    }
};

namespace boost::asio {
    template <typename T>
    struct is_match_condition<JsonReader<T>> : public boost::true_type { };
}

static inline void run_tests() {
    std::vector<std::string> valid {
        R"({})",
        R"({"a":4, "b":5})",
        R"([])",
        R"([4, "b"])",
    },
    incomplete {
        R"({)",
        R"({"a":4, "b")",
        R"([)",
        R"([4, ")",
    },
    invalid {
        R"(})",
        R"("a":4 })",
        R"(])",
    },
    excess {
        R"({}{"a":4, "b":5})",
        R"([]["a", "b"])",
        R"({} bogus trailing data)",
    };

    auto run_tests = [&](auto& cases) {
        for (std::string buf : cases) {
            std::cout << "Testing " << std::left << std::setw(22) << buf;
            bool ok = JsonReader { boost::asio::dynamic_buffer(buf) }
                (buf.begin(), buf.end())
                .second;

            std::cout << " -> " << std::boolalpha << ok << std::endl;

            if (ok && !buf.empty()) {
                std::cout << " -- remaining buffer " << std::quoted(buf) << "\n";
            }
        }
    };

    std::cout << " ----- valid test cases \n";
    run_tests(valid);
    std::cout << " ----- incomplete test cases \n";
    run_tests(incomplete);
    std::cout << " ----- invalid test cases \n";
    run_tests(invalid);
    std::cout << " ----- excess input test cases \n";
    run_tests(excess);
}

template <typename SyncReadStream, typename Buffer>
static void read(SyncReadStream& s, Buffer bufarg, nlohmann::json& message) {
    using boost::asio::buffers_begin;
    using boost::asio::buffers_end;

    JsonReader reader{bufarg};;
    read_until(s, bufarg, reader);
    message = reader.message;
}

int main() {
    run_tests();
}

Prints

 ----- valid test cases
Testing {}                     -> true
Testing {"a":4, "b":5}         -> true
Testing []                     -> true
Testing [4, "b"]               -> true
 ----- incomplete test cases
Testing {                      -> false
Testing {"a":4, "b"            -> false
Testing [                      -> false
Testing [4, "                  -> false
 ----- invalid test cases
Testing }                      -> false
Testing "a":4 }                -> false
Testing ]                      -> false
 ----- excess input test cases
Testing {}{"a":4, "b":5}       -> false
Testing []["a", "b"]           -> false
Testing {} bogus trailing data -> false

Looking For Alternatives

You could roll your own as I did in the past:

Or we can look at another library that DOES allow us to either detect boundaries of valid JSON fragments OR detect and leave trailing input.

Hand-Rolled Approach

Here's a simplistic translation to more modern Spirit X3 of that linked answer:

// Note: first iterator gets updated
// throws on known invalid input (like starting with `]' or '%')
template <typename It>
bool tryParseAsJson(It& f, It l)
{
    try {
        return detail::x3::parse(f, l, detail::json);
    } catch (detail::x3::expectation_failure<It> const& ef) {
        throw std::runtime_error("invalid JSON data");
    }
}

The crucial point is that this *in addition to return true/false will update the start iterator according to how far it consumed the input.

namespace JsonDetect {
    namespace detail {
        namespace x3 = boost::spirit::x3;
        static const x3::rule<struct value_> value{"value"};

        static auto primitive_token
            = x3::lexeme[ x3::lit("false") | "null" | "true" ];

        static auto expect_value
            = x3::rule<struct expect_value_> { "expect_value" }
            // array, object, string, number or other primitive_token
            = x3::expect[&(x3::char_("[{\"0-9.+-") | primitive_token | x3::eoi)]
            >> value
            ;

        // 2.4.  Numbers
        // Note our spirit grammar takes a shortcut, as the RFC specification is more restrictive:
        //
        // However non of the above affect any structure characters (:,{}[] and double quotes) so it doesn't
        // matter for the current purpose. For full compliance, this remains TODO:
        //
        //    Numeric values that cannot be represented as sequences of digits
        //    (such as Infinity and NaN) are not permitted.
        //     number = [ minus ] int [ frac ] [ exp ]
        //     decimal-point = %x2E       ; .
        //     digit1-9 = %x31-39         ; 1-9
        //     e = %x65 / %x45            ; e E
        //     exp = e [ minus / plus ] 1*DIGIT
        //     frac = decimal-point 1*DIGIT
        //     int = zero / ( digit1-9 *DIGIT )
        //     minus = %x2D               ; -
        //     plus = %x2B                ; +
        //     zero = %x30                ; 0
        static auto number = x3::double_; // shortcut :)

        // 2.5 Strings
        static const x3::uint_parser<uint32_t, 16, 4, 4> _4HEXDIG;

        static auto char_ = ~x3::char_("\"\\") |
               x3::char_(R"(\)") >> (       // \ (reverse solidus)
                   x3::char_(R"(")") |      // "    quotation mark  U+0022
                   x3::char_(R"(\)") |      // \    reverse solidus U+005C
                   x3::char_(R"(/)") |      // /    solidus         U+002F
                   x3::char_(R"(b)") |      // b    backspace       U+0008
                   x3::char_(R"(f)") |      // f    form feed       U+000C
                   x3::char_(R"(n)") |      // n    line feed       U+000A
                   x3::char_(R"(r)") |      // r    carriage return U+000D
                   x3::char_(R"(t)") |      // t    tab             U+0009
                   x3::char_(R"(u)") >> _4HEXDIG )  // uXXXX                U+XXXX
               ;

        static auto string = x3::lexeme [ '"' >> *char_ >> '"' ];

        // 2.2 objects
        static auto member
            = x3::expect [ &(x3::eoi | '"') ]
            >> string
            >> x3::expect [ x3::eoi | ':' ]
            >> expect_value;

        static auto object
            = '{' >> ('}' | (member % ',') >> '}');

        // 2.3 Arrays
        static auto array
            = '[' >> (']' | (expect_value % ',') >> ']');

        // 2.1 values
        static auto value_def = primitive_token | object | array | number | string;

        BOOST_SPIRIT_DEFINE(value)

        // entry point
        static auto json = x3::skip(x3::space)[expect_value];
    }  // namespace detail
}  // namespace JsonDetect

Obviously you put the implementation in a TU, but on Compiler Explorer we can't: Live On Compiler Explorer, using an adjusted JsonReader prints:

SeheX3Detector
==============
 ----- valid test cases 
Testing {}                     -> true
Testing {"a":4, "b":5}         -> true
Testing []                     -> true
Testing [4, "b"]               -> true
 ----- incomplete test cases 
Testing {                      -> false
Testing {"a":4, "b"            -> false
Testing [                      -> false
Testing [4, "                  -> false
 ----- invalid test cases 
Testing }                      -> invalid JSON data
Testing "a":4 }                -> true -- remaining `:4 }`
Testing ]                      -> invalid JSON data
 ----- excess input test cases 
Testing {}{"a":4, "b":5}       -> true -- remaining `{"a":4, "b":5}`
Testing []["a", "b"]           -> true -- remaining `["a", "b"]`
Testing {} bogus trailing data -> true -- remaining ` bogus trailing data`

NlohmannDetector
================
 ----- valid test cases 
Testing {}                     -> true
Testing {"a":4, "b":5}         -> true
Testing []                     -> true
Testing [4, "b"]               -> true
 ----- incomplete test cases 
Testing {                      -> false
Testing {"a":4, "b"            -> false
Testing [                      -> false
Testing [4, "                  -> false
 ----- invalid test cases 
Testing }                      -> false
Testing "a":4 }                -> false
Testing ]                      -> false
 ----- excess input test cases 
Testing {}{"a":4, "b":5}       -> false
Testing []["a", "b"]           -> false
Testing {} bogus trailing data -> false

Note how we now achieved some of the goals.

  • accepting trailing data - so we don't clobber any data after our message
  • failing early on some inputs that cannot possibly become valid JSON
  • However, we can't fix the problem of waiting indefinitely on /possibly/ incomplete valid data
  • Interestingly, one of our "invalid" test cases was wrong (!). (It is always a good sign when test cases fail). This is because "a" is actually a valid JSON value on its own.

Conclusion

In the general case it is impossible to make such a "complete message" detection work without at least limiting buffer size. E.g. a valid input could start with a million spaces. You don't want to wait for that.

Also, a valid input could open a string, object or array², and not terminate that within a few gigabytes. If you stop parsing before hand you'll never know whether it was ultimately a valid message.

Though you'll inevitably have to deal with network timeout anyways you will prefer to be proactive about knowing what to expect. E.g. send the size of the payload ahead of time, so you can use boost::asio::transfer_exactly and validate precisely what you expected to get.


¹ practically. If you don't care about performance, you could iteratively run accept on increasing lengths of buffer

² god forbid, a number like 0000....00001 though that's subject to parser implementation differences

Upvotes: 0

Related Questions