krismath
krismath

Reputation: 1969

Python socket not receiving all data from C++ Boost asio

In a nutshell

I am trying to send encoded and compressed image data (~300kB) from a boost::asio server (embedded system) and receive it via python socket client (laptop). The client seems to be receiving only 65532 bytes every time while the client is sending around 350000 bytes.

My thoughts are that maybe something is limiting the TCP packet size (either boost::asio or python socket or else) to around 65535 bytes.

Protocol

  1. The client send READY, a string with length 5, to the server.
  2. The server receives READY from the client.
  3. The server transmit the length of the message, a 4-byte uint, to the client (the length is only of the message; i.e. excluding the length itself).
  4. The server transmit the message to the client.
  5. The client receives the length and message from the server.

Description

The server will create a new img_session object when a new connection is made. When a read event occurs, The handle_read() function will be called. img_stream is the zlib compressed and base64 encoded std::string which is 'synchronized' via img_lock.

The client calls run_image() once in a thread. The method will start by sending a READY signal and wait for the server to respond. The server's first response will be the length of the rest of the message. Then it will continuously receive the rest of the message and keep it in the variable message. Meanwhile, a concurrent thread will decode and decompress the message and keep the image in the variable frame.

Code

Server Code (img_session.h)

#ifndef _IMG_SESSION_H_
#define _IMG_SESSION_H_

#include <session.h>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <iostream>
#include <utility.h>
#include <boost/thread.hpp>

class img_session : public session
{
public:
    img_session(boost::asio::io_service& io_service, std::string * img_stream, int*  img_lock);
    ~img_session();

    /* Starts the session. This is called in the constructor.*/
    void start();

private:
    void handle_read(const boost::system::error_code& error, size_t bytes_transferred);
    void handle_write(const boost::system::error_code& error);

    std::string * img_stream;
    int * img_lock;
};

#endif /* _IMG_SESSION_H */

Server Code (img_session.cpp)

#include "img_session.h"
#include <session.h>

/* Public Methods */
img_session:: img_session(boost::asio::io_service& io_service, std::string * img_stream, int * img_lock) : session(io_service), img_stream(img_stream), img_lock(img_lock){
}

img_session::~img_session() { }

void img_session::start()
{
    std::cout << "connection from " << _socket.remote_endpoint() << "\n";
    std::cout << "img_session waiting for READY signal..." << std::endl;
    _socket.async_read_some(boost::asio::buffer(_data, max_length),
        boost::bind(&img_session::handle_read, this,
            boost::asio::placeholders::error,
            boost::asio::placeholders::bytes_transferred));
}

/* Private Methods */

void img_session::handle_read(const boost::system::error_code& error, size_t bytes_transferred)
{
    if (!error)
    {
        char otherString[6];
        std::cout << "[INFO ] IMG READ" << std::endl;
        if (strncmp(_data, "READY", 5) == 0) {
            std::cout << "[INFO ] READY signal received..." << std::endl;
            while (*img_lock == 1) boost::this_thread::yield();
            *img_lock = 1;
            std::string message = std::string(*img_stream);
            unsigned int len = message.length();

            std::cout << "len = " << len << std::endl;
            unsigned char bytes[4];

            bytes[0] = (len >> 24) & 0xFF;
            bytes[1] = (len >> 16) & 0xFF;
            bytes[2] = (len >> 8) & 0xFF;
            bytes[3] = len & 0xFF;
            message = std::string((char* ) bytes, 4) + message;
            std::cout << "img_session.cpp: bytes[] = " << (int) bytes[0] << " " << (int)bytes[1] << " " << (int)bytes[2] << " " << (int)bytes[3] << std::endl;
            std::cout << "img_session.cpp: message.length() = " << message.length() << std::endl;
            std::cout << "img_session.cpp: HEAD: " << message.substr(0, 1024) << std::endl;
            std::cout << "img_session.cpp: TAIL: " << message.substr(message.length() - 1024, message.length() - 1);


            boost::asio::async_write(_socket,
                boost::asio::buffer(message.c_str(), message.length()), 
                boost::bind(&img_session::handle_write, this,
                    boost::asio::placeholders::error));

            *img_lock = 0;
        }
        else {
            std::cout << "[INFO ] READY signal not received..." << std::endl;
            boost::asio::async_write(_socket,
                boost::asio::buffer("", 1),
                boost::bind(&img_session::handle_write, this,
                    boost::asio::placeholders::error));
        }
        clean();
    }
    else
    {
        delete this;
    }
}

void img_session::handle_write(const boost::system::error_code& error)
{
    if (!error)
    {

        std::cout << "img_session waiting for READY signal..." << std::endl;
        _socket.async_read_some(boost::asio::buffer(_data, max_length),
            boost::bind(&img_session::handle_read, this,
                boost::asio::placeholders::error,
                boost::asio::placeholders::bytes_transferred));
    }
    else
    {
        delete this;
    }
}

Client Code (controller.py)

...

def run_image(hostname, port, interrupt):
    global frame
    sck = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sck.connect((hostname, port))
    sck.settimeout(100)
    message = None
    while(interrupt.is_set()):
        buffer = message
        t = threading.Thread(target=decodeImg, args=(buffer,))
        t.start()
        answer = b" "
        message = b""
        sck.send(b"READY")
        print("Sent READY signal")
        answer = sck.recv(4)
        num = int(answer[0] << 24) + int(answer[1] << 16) + int(answer[2] << 8) + int(answer[3])
        print("num:", num)

        try:
            for i in range(math.ceil(num/1024)):
                print("Receiving... (", i,"of",math.ceil(num/1024),")")
                answer = sck.recv(1024) # row size

                if (i == 0):
                    print("#### HEAD ####")
                print(answer)
                message += answer
        except socket.timeout:
            pass
        print("#### TAIL ####")
        print(answer)
        print("###########################################")
        print("##### Received! message (len=", len(message),") #####")
        print("###########################################")
        t.join()
        #print("Updated")
    sck.close()
    print("run_image exited")

decompress = zlib.decompress
decode = base64.b64decode
toframe = pygame.image.fromstring

def decodeImg(raw):
    if (raw is None or raw == ""):
        return
    global frame
    print("Message length     :", len(raw))
    raw = decode(raw)
    print("Decoded length     :", len(raw))
    raw = decompress(raw)
    print("Decompressed length:", len(raw))
    frame = toframe(raw, image_size, 'RGB')
    if (frame is None):
        print("ERR None frame")

...

(Full client code is at https://pastebin.com/ut6L70XM)

Response

Server's log

...

connection from 127.0.0.1:61616
img_session waiting for READY signal...
[DEBUG] server.h: instance of img_session found
[INFO ] IMG READ
[INFO ] READY signal received...
[INFO ] img_stream.length() = 349544
len = 349544
img_session.cpp: bytes[] = 0 5 85 104
img_session.cpp: message.length() = 349548
img_session.cpp: HEAD:  UheJy0vYGZpLgSrIsJMkEmyARMwASZIBNkAiZgQpnQJowJY8Ix4ZUyI
jITqnrPnnvv62V7KBBCUP3xKyJTYtvKdv8p6Td+2u1jeR5gP/W5oaSVtVRuKh8FvhyTV7095VkDz1u+V
ZVrbR8nq79cxfbtFI/Pv5/stmcvpbDJVc0sreC8a1fZarPf7722mddXuBeNLPZTsRfFCiusqrzVdw3vO
1wqjl2rONf6VO0O1Lq1dSCqKmulbLbwp64jqhVchzaUuS3r8Lax2rLpRJWfV2MKm6fL8SuIC6z2x5CLq
VX5o58xrhTNbjjpvu4edjWW8Yt6X8Yqs5ZNK9iCu7eXWm1LteW9/X0D/ZCKG871OByLGmC3ay3vjU3F3
rsaitmdbFar3dL1VduS7nBlDWt7LXu1b8m3V9SwVvZW2/7+3Nr+/r+uXY01vy9lLVZmLdu7YFsre92Hr
b+XrRylHK0eRz1aO+p7qX2vx96wvNd7q/1o63fVeqmjvbfYxrbKj2NtGdV+71qOtKy9+9jXmUdpo+/js
OW9sar8e1dL6+O92Mpuh7dbnR1lViVt/Y6Tro39vYy9t/1drOvjKvn+2HdbWh+rGdj7bsYqbCddB2L7f
vTdVt4fsfKuoR8dy7vCVc8x5rGaus57jHasSxv9XWx9fP9+r8+DW+b7nH28D+32+73MPrE+bOO0LcOW9
8f35mOtzPf2MdZGlZ/Vtmt5l8PhOHDiqLXBtpza+F7KmPvgRhRGtUOHHKp2tyYNtootPHBdOOMx38t5j
HqM4te789rfy/sGHrxXuF3vjb2tjbh16+69b/tm6/sqj9u4bjWL2eGo7X2WceC8645V+43t3bbbxzn89
rK221XUdRPsRg0rOXiXum3B3XivzHHyhhxz1dPfV3mui8WucU5bxvsujmvYlvf6NVeBc66P/b1MFFjHv
rfs65avwnNOHD77+yzn+irnuXcWu/p52oroVjLmHjR5bKm3YjcA/bKdp/hCKVX4laoJrL9C7yuUP8vc1
1lz/Ydj
img_session.cpp: TAIL: sC5fp569IQ/fnp9NsY4EfMdjx8cR77ftC8426c2vPM27KFV/q+Crrf1tH
/fnYt+KoafkL+S9PaSs96I4qAmH7fHK/p2qcY6IFrOdR14uAs87ssSvGTw+rx/ib0ci+kXMpRGd2D2Sx
cHWl4rCb42a3MHfFyUzlPUTwhm5rA+DWF1Z6JPUtpwUqX90KiWpYMzOer1mRRv5K1yOUPHB36F/H6PjY
Hw4NkqwQb0WHrJn2ws85PB3owhbaZ4iECNkp2D0aylzFBMLVOK4USwzpKFWQZpFXd2lOm8eicRyLAuKL
y3Gou2FvyGELfezceNR8I31QOXroJiuOOH9OMhSpxVgmPGlQ4XMXw5by49xfWOSrSIlVVOoBlUxAnp9u
xwSTPras50W+LMZAE6iSe26qbDbAPcyfeyQpIQ+91BTfkKdYbcaJBWH35RQ8qVLKKuoMaAvQlnIBms4l
XNeiFJesperkO000isv5NQ48FkQZ1Ua/w7AqRI6M1EEGuc4sSw7GdFmibbpNMGUnyjCxtkGQzjnaouUr
2o+ajPq3JSM2pt03zroAyzDs1axEu05eHWd5WDo5ZC0rvEd8RR496IsuxuzWCyeNszWWM9lQOGMAmtI4
Dm1I/egQrD6knzxq3sY2oXHNbJk1i5At0YpbLrOntb1AgHKcuyxZ/pXBvIAMpDac/LVXtNhXvA6Hel2Z
PvwOBj2GaXgpt7Evaj/Vm6WeePzRrQ+nQfzYDPUHkr5/B0rGvgQKrgi/Hz+ehT4zLPgu9GCHvmNIYStZ
fC7L7luR1NVhtBwKmSuQNip9DjfAe9/0a0k2fkx0Vko4HKijDkh62YmXF5YTBgG3jSgmRv/qvbeIZwP/
NsG+HIdYWGoqmxX0C5PamF936VxCd0bYz8rtjnf1v35pj8soTpHasDdfTFsvGuuRdMtFLycccHIVgK3I
gX9UnpZIjQnaeD2HpK2vubAyVc8CrIXAzdemYjOXHTo3EeI3/7RCR3yDA7qDsZIkNWj7vKl84XsQeH/A
X1TLNQ=

...

Client's log

Sent READY signal
num: 349544
Receiving... ( 0 of 342 )
#### HEAD ####
b'eJy0vYGZpLgSrIsJMkEmyARMwASZIBNkAiZgQpnQJowJY8Ix4ZUyIjITqnrPnnvv62V7KBBCUP3xKyJTYtvKdv8p6Td+2u1jeR5gP/W5oaSVtVRuKh8FvhyTV7095VkDz1u+VZVrbR8nq79cxfbtFI/Pv5/stmcvpbDJVc0sreC8a1fZarPf7722mddXuBeNLPZTsRfFCiusqrzVdw3vO1wqjl2rONf6VO0O1Lq1dSCqKmulbLbwp64jqhVchzaUuS3r8Lax2rLpRJWfV2MKm6fL8SuIC6z2x5CLqVX5o58xrhTNbjjpvu4edjWW8Yt6X8Yqs5ZNK9iCu7eXWm1LteW9/X0D/ZCKG871OByLGmC3ay3vjU3F3rsaitmdbFar3dL1VduS7nBlDWt7LXu1b8m3V9SwVvZW2/7+3Nr+/r+uXY01vy9lLVZmLdu7YFsre92Hrb+XrRylHK0eRz1aO+p7qX2vx96wvNd7q/1o63fVeqmjvbfYxrbKj2NtGdV+71qOtKy9+9jXmUdpo+/jsOW9sar8e1dL6+O92Mpuh7dbnR1lViVt/Y6Tro39vYy9t/1drOvjKvn+2HdbWh+rGdj7bsYqbCddB2L7fvTdVt4fsfKuoR8dy7vCVc8x5rGaus57jHasSxv9XWx9fP9+r8+DW+b7nH28D+32+73MPrE+bOO0LcOW98f35mOtzPf2MdZGlZ/Vtmt5l8PhOHDiqLXBtpza+F7KmPvgRhRGtUOHHKp2tyYNtootPHBdOOMx38t5jHqM4te789rfy/sGHrxXuF3vjb2tjbh16+69b/tm6/sqj9u4bjWL2eGo7X2WceC8645V+43t3bbbxzn89rK221XUdRPsRg0rOXiXum3B3XivzHHyhhxz1dPfV3mui8WucU5bxvsujmvYlvf6NVeBc66P/b1MFFjHvrfs65avwnNOHD77+yzn+irnuXcWu/p52oroVjLmHjR5bKm3YjcA/bKdp/hCKVX4laoJrL9C7yuUP8vc11lz/Ydjfr/M'

... 

Receiving... ( 63 of 342 )
b'JGPgXARwxzUdiKKMzTRc4OaHzidOcNJrBQHbD+M6nZ6PKBrUlsRiq9KhIQY/JTOVqSd5ufKNbatfImARAtARU6pbAa2mh0YHVQrR04zzmEVA5Pnrc4Gt4kUiaWzVqu/Nv7tWw63102hqqZHgKyrXxsBq8emZeBQCriWU18xHfJZ4Fwq6+8cs0rMdIeCKitrmHTO6gFcZ1W3Q1zTDLQ6rbpMNOhSTBjIQrOwv+7xLiykV3pxd4SpDFC+JxcWaX5mUXWVBI7PLjG5EkKM0GAHgkfm8ynnGLITQs97JCiW/KwLEc+ZVW0IOrwJxiw3ZDnr4z7kRtOnjoX9ZiDSCvgwHP6mCn09UB+9AsCVnhSlt/TpQo0T+ShQnmXDwNpXLPSAQFCxT+6CDTYROLxKRWI6eCQlz57ipCo7OrIuZYFLQdzaATzI62KStlL8tv/CWbwmLUdsnul0T5hyyB/h7nFeiodJy7E5rStiJfCJXxwLl7zv2y/cDI0Apa37CfmjK4SO9ns7qcPgeoYt9zZcGLbeMULcaNMAYL3653y4Wy3ZO2dHKiPv6v8vg/Z+j7yDSy8IWHuC4bYB35eHfEei08vZDUpexFVfB8HEsyDJSIN4/7/uINS/0ptkjI+vi8DJiyijSO/xHfvInih/DS9aEvBv99pLhk7+RV/5GFD0pgcQPBYm0nwmLgE0PG9b/h8LQ+FvlPwf7Sp346Adq5nN6/nNXrhSN2RmpNfDXeUvX3mqRz/3DA6TnvZ16VGOCEgrag9DZHG7hZAZwE3brJ+Wl+BLRRMzvGpNq2mHqGU0cmeSdfJyAvciWWd1XQ2J2/JlGMu4AN7VfjqRiE9ctjQ2kp1sl2npXPKIqbhsDJ30cM4E7yOjIjjNs8yOOIZfb859t/y5sk2GueLHypprjHk2nlfzcKbQrQ95qdVVVOIyuGqz89XqiCqAui8K+CBP3pgjyhGwo9MXSqyCZrf8zrWtMaTQhGPJ5EWcfaf5BK/hlEZJiwTEX0oNuNFUw/Ge50BvJ'
Receiving... ( 64 of 342 )
b''
Receiving... ( 65 of 342 )
b''

...

Receiving... ( 340 of 342 )
b''
Receiving... ( 341 of 342 )
b''
#### TAIL ####
b''
###########################################
##### Received! message (len= 65532 ) #####
###########################################
Message length     : 65532
Decoded length     : 49149

Exception in thread Thread-4:
Traceback (most recent call last):
  File "C:\Python35\lib\threading.py", line 914, in _bootstrap_inner
    self.run()
  File "C:\Python35\lib\threading.py", line 862, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Python35\controller.py", line 204, in decodeImg
    raw = decompress(raw)
zlib.error: Error -5 while decompressing data: incomplete or truncated stream

Upvotes: 2

Views: 1408

Answers (1)

sehe
sehe

Reputation: 392954

You're sending from a local variable, but it goes out of scope before writing completes.

Make message a member variable.

NOTE: The img_lock is not thread safe

Here's a working demo:

Live On Coliru

#ifndef _IMG_SESSION_H_
#define _IMG_SESSION_H_

#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <iostream>
#include <boost/thread.hpp>

struct session {
    session(boost::asio::io_service& svc) : svc(svc) {
    }

    void clean() {}
    boost::asio::io_service& svc;
    boost::asio::ip::tcp::socket _socket{svc};
    int max_length = 1024;
    //std::array<char, 1024> _data;
    char _data[1024];
};

class img_session : public session
{
public:
    img_session(boost::asio::io_service& io_service, std::string * img_stream, int*  img_lock);
    ~img_session();

    /* Starts the session. This is called in the constructor.*/
    void start();

private:
    void handle_read(const boost::system::error_code& error, size_t bytes_transferred);
    void handle_write(const boost::system::error_code& error, size_t bytes_transferred);

    std::string * img_stream;
    int * img_lock;
    std::string message;
};

#endif /* _IMG_SESSION_H */

/* Public Methods */
img_session:: img_session(boost::asio::io_service& io_service, std::string * img_stream, int * img_lock) : session(io_service), img_stream(img_stream), img_lock(img_lock){
}

img_session::~img_session() { }

void img_session::start()
{
    std::cout << "connection from " << _socket.remote_endpoint() << "\n";
    std::cout << "img_session waiting for READY signal..." << std::endl;
    _socket.async_read_some(boost::asio::buffer(_data, max_length),
        boost::bind(&img_session::handle_read, this,
            boost::asio::placeholders::error,
            boost::asio::placeholders::bytes_transferred));
}

/* Private Methods */

void img_session::handle_read(const boost::system::error_code& error, size_t /*bytes_transferred*/)
{
    if (!error)
    {
        //char otherString[6];
        std::cout << "[INFO ] IMG READ" << std::endl;
        if (strncmp(_data, "READY", 5) == 0) {
            std::cout << "[INFO ] READY signal received..." << std::endl;
            while (*img_lock == 1) boost::this_thread::yield();
            *img_lock = 1;
            message = "0000" + *img_stream;
            size_t len = message.length();

            std::cout << "len = " << len << std::endl;

            message[0] = (len >> 24) & 0xFF;
            message[1] = (len >> 16) & 0xFF;
            message[2] = (len >> 8) & 0xFF;
            message[3] = len & 0xFF;
            std::cout << "img_session.cpp: bytes[] = " << (int) message[0] << " " << (int)message[1] << " " << (int)message[2] << " " << (int)message[3] << std::endl;
            std::cout << "img_session.cpp: message.length() = " << message.length() << std::endl;
            std::cout << "img_session.cpp: HEAD: " << message.substr(4, 1024) << std::endl;
            std::cout << "img_session.cpp: TAIL: " << message.substr(message.length() - 1024, message.length() - 1);


            boost::asio::async_write(_socket,
                boost::asio::buffer(message), 
                boost::bind(&img_session::handle_write, this,
                    boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));

            *img_lock = 0;
        }
        else {
            std::cout << "[INFO ] READY signal not received..." << std::endl;
            /*
             *boost::asio::async_write(_socket,
             *    boost::asio::buffer("", 1),
             *    boost::bind(&img_session::handle_write, this,
             *        boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
             */
        }
        clean();
    }
    else
    {
        delete this;
    }
}

void img_session::handle_write(const boost::system::error_code& error, size_t bytes_transferred)
{
    if (!error)
    {
        std::cout << "img_session waiting for READY signal..." << std::endl;
        _socket.async_read_some(boost::asio::buffer(_data, max_length),
            boost::bind(&img_session::handle_read, this,
                boost::asio::placeholders::error,
                boost::asio::placeholders::bytes_transferred));
    }
    else
    {
        std::cout << __FUNCTION__ << ":" << error.message() << "\n";
        delete this;
    }
}

int main() {
    boost::asio::io_service svc;
    std::string img_stream(300<<10, '*');
    int img_lock = 0;
    auto s = new img_session(svc, &img_stream, &img_lock);

    using boost::asio::ip::tcp;

    tcp::acceptor a(svc);
    a.open(tcp::v4());
    a.set_option(tcp::acceptor::reuse_address(true));
    a.bind({{}, 6767});
    a.listen(1);
    a.accept(s->_socket);

    s->start();

    svc.run();
}

This, when using a client

netcat localhost 6767 | wc 
READY

Prints

  0       1  307204

Upvotes: 5

Related Questions