Summit
Summit

Reputation: 2258

Boost Server breaking the messages from the client

If i send messages locally from the same system than the boost server receives messages properly.

When the client is a remote application on other system and sending messages through TCP\IP than randomly some messages break(Line Enter).

Like if the client has sent "THIS IS A MESSAGE" the server will read it as

"THIS IS A ME

SSAGE"

This is the Server class.

#pragma once

#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/write.hpp>
#include <iostream>
#include <global.h>
#include <memory>
#include <fstream>
#include <fstream>
#include <iomanip>
#include <iostream>
#include <memory>
#include <queue>

using boost::asio::ip::tcp;


class session
    : public std::enable_shared_from_this<session>
{
public:
    session(tcp::socket socket)
        : socket_(std::move(socket))
    {

    }

    void start()
    {

        auto self(shared_from_this());
        // dispatch not strictly necessary for single-threaded contexts
        dispatch(
            socket_.get_executor(),
            [this, self]
            {
                do_read();
            });
    }

private:

    
    void handleCommand()
    {
          enqueueAnswer();
    }


    void enqueueAnswer()
    {

        if (stdqueAnswers.size() == 1)
        {
            do_write();
        }
    }


    void do_read()
    {
        auto self(shared_from_this());
        socket_.async_read_some(boost::asio::buffer(data_, max_length),
            [this, self](boost::system::error_code ec, std::size_t length)
            {
                if (!ec)
                {
                    if (length > 0) {

                        // In case the message has a leading 1 than we have to send a answer back to the client.
                        if (data_[0] == '1') {
                            std::string stdstrCmd(data_);
                            stdstrCmd.erase(0, 2);
                            wavefrontAccess->ReceiveCommandExternalGet(stdstrCmd);
                            handleCommand();
                        }
                        else
                        {
                        
                            std::string strData(data_, length);
                            if(!strData.empty() || strData.find_first_not_of(' ') != std::string::npos)
                            {
                                // There's a non-space.
                                commandsQueue.push(strData);  // this is std Queue
                            
                            }
                        }
                        
                    }
                    do_read();

                }
            });

    }

    void do_write()
    {
        if (stdqueAnswers.empty())
            return;

        auto self(shared_from_this());

        async_write(
            socket_,
            boost::asio::buffer(stdqueAnswers.front()),
            [this, self](boost::system::error_code ec, size_t)
            {
                if (!ec)
                {
                    stdqueAnswers.pop();
                    do_write();
                }
            });

    }

    tcp::socket socket_;
    enum { max_length = 12000 };
    char data_[max_length];
};

class server
{
public:
    server(boost::asio::io_context& io_context, std::uint16_t port)
        : acceptor_{ io_context, tcp::endpoint(tcp::v4(), port) }
    {
        acceptor_.listen();
        do_accept();
    }

private:
    void do_accept()
    {
        acceptor_.async_accept(
            make_strand(acceptor_.get_executor()),
            [this](boost::system::error_code ec, tcp::socket socket)
            {
                if (!ec)
                {
                    std::make_shared<session>(std::move(socket))->start();
                    do_accept();
                }
            });
    }

    tcp::acceptor acceptor_;
};

Upvotes: 1

Views: 68

Answers (2)

sehe
sehe

Reputation: 394044

Like Ruslan explains, you should not use read_some, but a higher level operation that reads a full "message", as defined by your application level wire protocol.

You clearly already have some protocol (the leading bytes), and we cannot guess what the rest could be. For simplicity, let's assume that a full message ends with a \n character. Here's my simplifying take:

async_read_until(
    socket_, boost::asio::dynamic_buffer(data_, max_length), "\n",
    [this, self](boost::system::error_code ec, size_t length) {
        std::cerr << "async_read_until() " << ec.message() << std::endl;
        if (!ec) {
            std::string msg = data_.substr(0, length /* - 1*/);
            data_.erase(0, length);

            if (!msg.empty() && msg.front() == '1') {
                // we have to send a answer back to the client
                wavefrontAccess->ReceiveCommandExternalGet(msg.substr(2));
                handleCommand();
            } else {
                if (msg.find_first_not_of(' ') != std::string::npos) {
                    // There's a non-space
                    commandsQueue.push(msg);
                }
            }

            do_read();
        }
    });

I've simplified by making your buffer std::string directly. This immediately "solves" the complexity that read_until might obviously read more than a single message.

Uncomment the /* -1 */ to exclude the \n character from the message

With some missing bits mocked up:

Live On Coliru

#include <boost/asio.hpp>
#include <deque>
#include <fstream>
#include <iomanip>
#include <iostream>
#include <memory>
#include <queue>

using boost::asio::ip::tcp;

class session : public std::enable_shared_from_this<session> {
  public:
    session(tcp::socket socket) : socket_(std::move(socket)) {}

    void start() {
        auto self(shared_from_this());
        std::cerr << "start() " << socket_.remote_endpoint() << std::endl;
        dispatch(socket_.get_executor(), [this, self] { do_read(); });
    }

  private:
    void handleCommand() { enqueueAnswer(); }

    void enqueueAnswer() {
        if (stdqueAnswers.size() == 1) {
            do_write();
        }
    }

    void do_read() {
        auto self(shared_from_this());

        async_read_until(
            socket_, boost::asio::dynamic_buffer(data_, max_length), "\n",
            [this, self](boost::system::error_code ec, size_t length) {
                std::cerr << "async_read_until() " << ec.message() << std::endl;
                if (!ec) {
                    std::string msg = data_.substr(0, length /* - 1*/);
                    data_.erase(0, length);

                    if (!msg.empty() && msg.front() == '1') {
                        // we have to send a answer back to the client
                        wavefrontAccess->ReceiveCommandExternalGet(msg.substr(2));
                        handleCommand();
                    } else {
                        if (msg.find_first_not_of(' ') != std::string::npos) {
                            // There's a non-space
                            commandsQueue.push(msg);
                        }
                    }

                    do_read();
                }
            });
    }

    void do_write() {
        if (stdqueAnswers.empty())
            return;

        auto self(shared_from_this());

        async_write( //
            socket_, boost::asio::buffer(stdqueAnswers.front()),
            [this, self](boost::system::error_code ec, size_t) {
                std::cerr << "async_write() " << ec.message() << std::endl;
                if (!ec) {
                    stdqueAnswers.pop_back();
                    do_write();
                }
            });
    }

    enum { max_length = 12000 };
    tcp::socket             socket_;
    std::string             data_;
    std::deque<std::string> stdqueAnswers;
    std::queue<std::string> commandsQueue;

    struct WavefrontAccess {
        session* _sess;

        void ReceiveCommandExternalGet(std::string cmd) {
            _sess->stdqueAnswers.push_back("reply for '" + std::move(cmd) + "'");
        }
    };
    std::unique_ptr<WavefrontAccess> wavefrontAccess =
        std::make_unique<WavefrontAccess>(WavefrontAccess{this});
};

class server {
  public:
    server(boost::asio::io_context& io_context, uint16_t port)
        : acceptor_{io_context, {{}, port}} {
        acceptor_.listen();
        do_accept();
    }

  private:
    void do_accept() {
        acceptor_.async_accept(
            make_strand(acceptor_.get_executor()),
            [this](boost::system::error_code ec, tcp::socket socket) {
                std::cerr << "accept: " << ec.message() << " "
                          << (ec ? tcp::endpoint{} : socket.remote_endpoint())
                          << std::endl;

                if (!ec) {
                    std::make_shared<session>(std::move(socket))->start();
                    do_accept();
                }
            });
    }

    tcp::acceptor acceptor_;
};

int main() {
    boost::asio::io_context ioc(1);
    server s(ioc, 7878);

    ioc.run();
}

enter image description here

Upvotes: 1

Ruslan Tushov
Ruslan Tushov

Reputation: 1223

Even locally, messages may split into smaller TCP packets, for both client and server.

"Framing" protocol must be able to encode and decode sequence of variable sized messages unambiguously for any sequence of split parts (e.g. abc, ab c, a bc, a b c).

Your protocol can do "framing" by itself, or work on top of other "framing" protocol.

Example: TCP

Fixed length header (20 bytes) contains size(s) of variable size fields, including message content.

TCP reads 20 bytes.
TCP parses message size.
TCP reads that amount of bytes.
(repeat)

Example: HTTP

Header doesn't have fixed length.
However it's structure is unambiguous.
It's a text which ends with \r\n\r\n.
This text mustn't include \r\n\r\n, but it can represent it with escaping.
Somewhere in that text, there is size of message.

HTTP reads text (character by character) until it sees \r\n\r\n.
HTTP parses message size.
HTTP reads that amount of bytes.
(repeat)

Possible solution

If your message text doesn't have limitations (e.g. may include terminator/escape string), then you need to write message size before each message.

Otherwise if your messages have structure, you can write "terminator" (e.g. end-of-line or zero-terminator) after end each message, and read message until "terminator".

Upvotes: 1

Related Questions