drulex
drulex

Reputation: 47

Boost asio: unable to acknowledge a file transfer

I am using boost asio to perform file transfers over TCP. The file transfer works, however when I decide to implement a simple acknowledge message from server to client by chaining async_write (on server) and async_read_until (on client), I observe a weird behaviour: the file is no longer received properly on the server side. A few hundred bytes before the end of the transfer the server just doesn't receive any more bytes and therefore never calls the async_write responsible for acknowledging the file transfer.

This seems to happen when I call async_read_until in my client after I am done writing the file. For some reason it affects the current file transfer.

Client implementation:

#include "StdAfx.h"
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/thread.hpp>
#include "AsyncTCPClient.h"


AsyncTCPClient::AsyncTCPClient(boost::asio::io_service& iIoService, const std::string& iServerIP, const std::string& iPath)
    : mResolver(iIoService), mSocket(iIoService)
{
    size_t wPos = iServerIP.find(':');
    if(wPos==std::string::npos)
    {
        return;
    }
    std::string wPortStr = iServerIP.substr(wPos + 1);
    std::string wServerIP = iServerIP.substr(0, wPos);

    mSourceFile.open(iPath, std::ios_base::binary | std::ios_base::ate);
    if(!mSourceFile)
    {
        LOG(LOGERROR) << "Failed to open file: " << iPath;
        return;
    }
    size_t wFileSize = mSourceFile.tellg();
    mSourceFile.seekg(0);
    std::ostream wRequestStream(&mRequest);
    wRequestStream << iPath << "\n" << wFileSize << "\n\n";

    LOG(LOGINFO) << "File to transfer: " << iPath;
    LOG(LOGINFO) << "Filesize: " << wFileSize << " bytes";

    tcp::resolver::query wQuery(wServerIP, wPortStr);
    mResolver.async_resolve(wQuery, boost::bind(&AsyncTCPClient::HandleResolve, this, boost::asio::placeholders::error, boost::asio::placeholders::iterator));

}

AsyncTCPClient::~AsyncTCPClient()
{
}

void AsyncTCPClient::HandleResolve(const boost::system::error_code & iErr, tcp::resolver::iterator iEndpointIterator)
{
    if(!iErr)
    {
        tcp::endpoint wEndpoint = *iEndpointIterator;
        mSocket.async_connect(wEndpoint, boost::bind(&AsyncTCPClient::HandleConnect, this, boost::asio::placeholders::error, ++iEndpointIterator));
    }
    else
    {
        LOG(LOGERROR) << "Error: " << iErr.message();
    }
}

void AsyncTCPClient::HandleConnect(const boost::system::error_code &iErr, tcp::resolver::iterator iEndpointIterator)
{
    if(!iErr)
    {
        boost::asio::async_write(mSocket, mRequest, boost::bind(&AsyncTCPClient::HandleWriteFile, this, boost::asio::placeholders::error));
    }
    else if(iEndpointIterator != tcp::resolver::iterator())
    {
        mSocket.close();
        tcp::endpoint wEndpoint = *iEndpointIterator;
        mSocket.async_connect(wEndpoint, boost::bind(&AsyncTCPClient::HandleConnect, this, boost::asio::placeholders::error, ++iEndpointIterator));
    }
    else
    {
        LOG(LOGERROR) << "Error: " << iErr.message();
    }
}

void AsyncTCPClient::HandleWriteFile(const boost::system::error_code& iErr)
{
    if(!iErr)
    {
        if(mSourceFile)
        {
            mSourceFile.read(mBuffer.c_array(), (std::streamsize)mBuffer.size());

            // EOF reached
            if(mSourceFile.gcount() <= 0)
            {
                return;
            }

            //LOG(LOGTRACE) << "Send " << mSourceFile.gcount() << "bytes, total: " << mSourceFile.tellg() << " bytes.\n";
            boost::asio::async_write(mSocket, boost::asio::buffer(mBuffer.c_array(), mSourceFile.gcount()), boost::bind(&AsyncTCPClient::HandleWriteFile, this, boost::asio::placeholders::error));
        }
        else
        {
            LOG(LOGINFO) << "File transfer done";
            /// async_read responsible for receiving a simple "ack[;]" once server is done receiving
            /// when I don't do this and simply return the server receives the file properly and sends the ack
            /// when I do this the server stops never receives the full file and simply waits for all the bytes to arrive which doesn't happen
            boost::asio::async_read_until(mSocket, mRecBuf, "[;]", boost::bind(&AsyncTCPClient::HandleReceiveAcknowledge, this, boost::asio::placeholders::error));
        }
    }
    else
    {
        LOG(LOGERROR) << "Error value: " << iErr.value();
        LOG(LOGERROR) << "Error message: " << iErr.message();
        throw std::exception();
    }
}
void AsyncTCPClient::HandleReceiveAcknowledge(const boost::system::error_code& iErr)
{
    if(!iErr)
    {
        std::string wRecData((std::istreambuf_iterator<char>(&mRecBuf)), std::istreambuf_iterator<char>());
        LOG(LOGDEBUG1) << "Acknowledged this data: " << wRecData;
        return;
    }
    else
    {
        // in case of error free resources and bail
        LOG(LOGERROR) << "Error value: " << iErr.value();
        LOG(LOGERROR) << "Error message: " << iErr.message();
        throw std::exception();
    }
}

Server implementation:

#include "StdAfx.h"
#include <boost/array.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <fstream>
#include <boost/enable_shared_from_this.hpp>
#include "AsyncTCPClient.h"
#include "AsyncTCPServer.h"
#include "Debug.h"


AsyncTCPServer::AsyncTCPServer(unsigned short iPort, const std::string iFilePath)
    :mAcceptor(mIoService, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), iPort), true)
{
    mAsyncTCPConnectionPtr wNewConnection(new AsyncTCPConnection(mIoService, iFilePath));
    mAcceptor.async_accept(wNewConnection->Socket(), boost::bind(&AsyncTCPServer::HandleAccept, this, wNewConnection, boost::asio::placeholders::error));
    mIoService.run();
}

AsyncTCPServer::~AsyncTCPServer()
{
    mIoService.stop();
}

void AsyncTCPServer::HandleAccept(mAsyncTCPConnectionPtr iCurConnection, const boost::system::error_code& iErr)
{
    if (!iErr)
    {
        iCurConnection->Start();
    }
    else
    {
        BIOLOG(BioSans::LOGERROR) << " " << iErr << ", " << iErr.message();
    }
}

Connection implementation:

#include "StdAfx.h"
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <fstream>
#include "Debug.h"
#include "AsyncTCPConnection.h"

AsyncTCPConnection::AsyncTCPConnection(boost::asio::io_service& iIoService, const std::string iFilePath)
    : mSocket(iIoService), mFileSize(0), mFilePath(iFilePath)
{
}

AsyncTCPConnection::~AsyncTCPConnection()
{
}

void AsyncTCPConnection::Start()
{
    LOG(LOGINFO) << "Start";
    async_read_until(mSocket, mRequestBuffer, "\n\n", boost::bind(&AsyncTCPConnection::HandleReadRequest, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}

void AsyncTCPConnection::HandleReadRequest(const boost::system::error_code& iErr, std::size_t iBytesTransferred)
{
    if(iErr)
    {
        return HandleError(__FUNCTION__, iErr);
    }
    LOG(LOGTRACE) << "(" << iBytesTransferred << ")" << ", in_avail = " << mRequestBuffer.in_avail() << ", size = " << mRequestBuffer.size() << ", max_size = " << mRequestBuffer.max_size();

    std::istream wRequestStream(&mRequestBuffer);
    std::string wFilePath;
    wRequestStream >> wFilePath;
    wRequestStream >> mFileSize;
    wRequestStream.read(mBuffer.c_array(), 2);

    mOutputFile.open(mFilePath, std::ios_base::binary);

    if(!mOutputFile)
    {
        LOG(LOGERROR) << "Failed to open: " << wFilePath;
        return;
    }
    do
    {
        wRequestStream.read(mBuffer.c_array(), (std::streamsize)mBuffer.size());
        LOG(LOGTRACE) << "Write " << wRequestStream.gcount() << " bytes";
        mOutputFile.write(mBuffer.c_array(), wRequestStream.gcount());
    }
    while(wRequestStream.gcount() > 0);
    async_read(mSocket, boost::asio::buffer(mBuffer.c_array(), mBuffer.size()),boost::bind(&AsyncTCPConnection::HandleReadFileContent, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}

void AsyncTCPConnection::HandleReadFileContent(const boost::system::error_code& iErr, std::size_t iBytesTransferred)
{
    if(iBytesTransferred>0)
    {
        mOutputFile.write(mBuffer.c_array(), (std::streamsize)iBytesTransferred);
        LOG(LOGTRACE) << "Received " << mOutputFile.tellp() << " bytes";
        if (mOutputFile.tellp()>=(std::streamsize)mFileSize)
        {
            /// file is received, send a simple ack message
            /// this code is never reach when I launch the last async_read_until on the client side
            char *wAckMsg = "ack[;]";
            boost::asio::async_write(mSocket, boost::asio::buffer(wAckMsg, strlen(wAckMsg)), boost::bind(&AsyncTCPConnection::HandleAcknowledge, this, boost::asio::placeholders::error));
        }
    }
    if(iErr)
    {
        return HandleError(__FUNCTION__, iErr);
    }
    async_read(mSocket, boost::asio::buffer(mBuffer.c_array(), mBuffer.size()), boost::bind(&AsyncTCPConnection::HandleReadFileContent, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}

void AsyncTCPConnection::HandleAcknowledge(const boost::system::error_code& iErr)
{
    if(!iErr)
    {
        LOG(LOGDEBUG1) << "Message acknowledged";
        return;
    }
    else
    {
        // in case of error free resources and bail
        LOG(LOGERROR) << "Error value: " << iErr.value();
        LOG(LOGERROR) << "Error message: " << iErr.message();
        throw std::exception();
    }
}

void AsyncTCPConnection::HandleError(const std::string& function_name, const boost::system::error_code& err)
{
    LOG(LOGERROR) << " in " << function_name <<" due to " << err <<" " << err.message();
}

Code to send file:

        boost::asio::io_service wIoService;
        AsyncTCPClient client(wIoService, iServerIP, iFilePath);
        wIoService.run();

I looked for an answer forever I simply cannot understand what/why is happening. Thanks in advance.

Upvotes: 3

Views: 471

Answers (1)

sehe
sehe

Reputation: 392893

On the server (receiving) side, you repeat

async_read(mSocket, boost::asio::buffer(mBuffer.c_array(), mBuffer.size()),
           boost::bind(&AsyncTCPConnection::HandleReadFileContent, shared_from_this(),
                       boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));

until you're done. This always seemed to work well, because the last read would return with eof:

This function is used to asynchronously read a certain number of bytes of data from a stream. The function call always returns immediately. The asynchronous operation will continue until one of the following conditions is true:

  • The supplied buffer is full (that is, it has reached maximum size).
  • An error occurred.

But now, since the client does not close the socket (because it's waiting for ACK), you keep trying to read until you fill the whole buffer. If your file happened to be of the right size you might achieve it by accident.


Other problems:

char *wAckMsg = "ack[;]";
boost::asio::async_write(
    mSocket, boost::asio::buffer(wAckMsg, strlen(wAckMsg)),
    boost::bind(&AsyncTCPConnection::HandleAcknowledge, this, boost::asio::placeholders::error));

In these 5 lines there are as many errors:

  • string literals can not be treated as char*, instead MUST be treated as char const (&)[]
  • the buffer points to that local variable. You're, again, probably lucky because string literals usually reside in a static data segment, and the local was merely a pointer to it, but this is not something you should rely on. The local variable doesn't exist during the asynchronous operation, which might lead to Undefined Behaviour depending on the compiler used
  • you bind the completion handler to this instead of shared_from_this. This means that the last reference to the mAsyncTCPConnectionPtr would be released, and ~AsyncTCPConnectionPtr runs. This will destroy the socket, which may or may not happen before completion of sending the Ack. In a multi-threaded server this could easily lead to Undefined Behaviour due to the data race.
  • after posting the async_write, you do not exit the function. This means that subsequent code is still executed, including the next async_read(mSocket, boost::asio::buffer(mBuffer.c_array(), mBuffer.size()), ....

I'd suggest the following as a minimum fix:

static char const *s_wAckMsg = "ack[;]";
boost::asio::async_write(
    mSocket, boost::asio::buffer(s_wAckMsg, strlen(s_wAckMsg)),
    boost::bind(&AsyncTCPConnection::HandleAcknowledge, shared_from_this(), boost::asio::placeholders::error));
return;

Minor other issues

Request parsing is not robust (errors aren't handled, neither is the format checked. At the end you just consume 2 characters blindly, which are just assumed to be '\n\n', but you never know).

You report the wrong filename if opening the output file fails.

You can use boost::asio::async_connect instead of the awkward handler chain.

There's no need to build a string using istreambuf_iterator<char> if all you want to do is print it.

Let's attempt to fix it

I wrote a metric ton of code to compile the code, and attempted to fix the bug by reducing the read size.

The async_read call is duplicated, so let's remove the duplication:

void AsyncTCPConnection::DoReceiveFileContent() {
    size_t expect = (mFileSize <= mOutputFile.tellp())? 0 : mFileSize - mOutputFile.tellp();
    LOG(LOGDEBUG1) << "expectedContent: " << expect;
    expect = std::min(mBuffer.size(), expect);

    async_read(mSocket, boost::asio::buffer(mBuffer.c_array(), expect),
               boost::bind(&AsyncTCPConnection::HandleReadFileContent, shared_from_this(),
                           boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}

Now we just call DoReceiveFileContent() whenever we want to schedule more read operations.

DEMO

Live On Coliru

//#include "AsyncTCPConnection.h"
//#include "Debug.h"
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/thread.hpp>
#include <fstream>
#include <iostream>

struct Buffer {
    size_t size() const { return sizeof(m_array); }
    char const *c_array() const { return m_array; }
    char *c_array() { return m_array; }

  private:
    char m_array[1024]{ 0 };
};

struct LogTx {
    LogTx(std::string const &name) { std::cout << name << "\t"; }
    LogTx(LogTx &&other) : armed(other.armed) { other.armed = false; }
    ~LogTx() { if (armed) std::cout << std::endl; }

    template <typename... T> friend LogTx operator<<(LogTx tx, T &&... args) {
        std::cout << (args << ...);
        return tx;
    }
  private:
    bool armed = true;
};

#define LOG(x) LogTx("LOG:" #x)
#define BIOLOG(x) LogTx("BIOLOG:" #x)
using boost::asio::ip::tcp;

struct AsyncTCPConnection : boost::enable_shared_from_this<AsyncTCPConnection> {
    AsyncTCPConnection(boost::asio::io_service &iIoService, const std::string iFilePath);
    ~AsyncTCPConnection();
    void Start();
    void HandleReadRequest(const boost::system::error_code &iErr, std::size_t iBytesTransferred);
    void HandleReadFileContent(const boost::system::error_code &iErr, std::size_t iBytesTransferred);
    void DoReceiveFileContent();
    void HandleAcknowledge(const boost::system::error_code &iErr);
    void HandleError(const std::string &function_name, const boost::system::error_code &err);

    tcp::socket &Socket() { return mSocket; }

  private:
    boost::asio::streambuf mRequestBuffer;
    Buffer mBuffer;
    tcp::socket mSocket;
    std::streamsize mFileSize;
    std::string mOutputFilePath;
    std::ofstream mOutputFile;
};

AsyncTCPConnection::AsyncTCPConnection(boost::asio::io_service &iIoService, const std::string iFilePath)
        : mSocket(iIoService), mFileSize(0), mOutputFilePath(iFilePath) {}

AsyncTCPConnection::~AsyncTCPConnection() {}

void AsyncTCPConnection::Start() {
    LOG(LOGINFO) << "Start";
    async_read_until(mSocket, mRequestBuffer, "\n\n",
                     boost::bind(&AsyncTCPConnection::HandleReadRequest, shared_from_this(),
                                 boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}

void AsyncTCPConnection::HandleReadRequest(const boost::system::error_code &iErr, std::size_t iBytesTransferred) {
    if (iErr) {
        return HandleError(__FUNCTION__, iErr);
    }
    LOG(LOGTRACE) << "(" << iBytesTransferred << ")"
                  << ", in_avail = " << mRequestBuffer.in_avail() << ", size = " << mRequestBuffer.size();

    std::istream wRequestStream(&mRequestBuffer);
    std::string wFilePath;
    wRequestStream >> wFilePath;
    LOG(LOGTRACE) << "Original filename " << wFilePath;
    wRequestStream >> mFileSize;
    LOG(LOGTRACE) << "Original filesize " << mFileSize;
    wRequestStream.read(mBuffer.c_array(), 2);

    mOutputFile.open(mOutputFilePath, std::ios_base::binary);

    if (!wRequestStream) {
        LOG(LOGERROR) << "Request could not be parsed";
        return;
    }
    if (!mOutputFile) {
        LOG(LOGERROR) << "Failed to open: " << mOutputFilePath;
        return;
    }
    do {
        wRequestStream.read(mBuffer.c_array(), (std::streamsize)mBuffer.size());
        LOG(LOGTRACE) << "Write " << wRequestStream.gcount() << " bytes";
        mOutputFile.write(mBuffer.c_array(), wRequestStream.gcount());
    } while (wRequestStream.gcount() > 0);

    DoReceiveFileContent();
}

void AsyncTCPConnection::DoReceiveFileContent() {
    size_t expect = (mFileSize <= mOutputFile.tellp())? 0 : mFileSize - mOutputFile.tellp();
    LOG(LOGDEBUG1) << "expectedContent: " << expect;
    expect = std::min(mBuffer.size(), expect);

    async_read(mSocket, boost::asio::buffer(mBuffer.c_array(), expect),
               boost::bind(&AsyncTCPConnection::HandleReadFileContent, shared_from_this(),
                           boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}

void AsyncTCPConnection::HandleReadFileContent(const boost::system::error_code &iErr, std::size_t iBytesTransferred) {
    if (iBytesTransferred > 0) {
        mOutputFile.write(mBuffer.c_array(), (std::streamsize)iBytesTransferred);
        LOG(LOGTRACE) << "Received " << mOutputFile.tellp() << " bytes (+" << iBytesTransferred << ")";
        if (mOutputFile.tellp() >= (std::streamsize)mFileSize) {
            LOG(LOGTRACE) << "Receive complete at " << mFileSize << " bytes";
            /// file is received, send a simple ack message
            /// this code is never reach when I launch the last async_read_until on the client side
            static char const *s_wAckMsg = "ack[;]";
            boost::asio::async_write(
                mSocket, boost::asio::buffer(s_wAckMsg, strlen(s_wAckMsg)),
                boost::bind(&AsyncTCPConnection::HandleAcknowledge, shared_from_this(), boost::asio::placeholders::error));
            return;
        }
    }
    if (iErr) {
        return HandleError(__FUNCTION__, iErr);
    }
    DoReceiveFileContent();
}

void AsyncTCPConnection::HandleAcknowledge(const boost::system::error_code &iErr) {
    if (!iErr) {
        LOG(LOGDEBUG1) << "Message ACK sent";
        return;
    } else {
        // in case of error free resources and bail
        LOG(LOGERROR) << "Error value: " << iErr.value();
        LOG(LOGERROR) << "Error message: " << iErr.message();
        throw std::exception();
    }
}

void AsyncTCPConnection::HandleError(const std::string &function_name, const boost::system::error_code &err) {
    LOG(LOGERROR) << " in " << function_name << " due to " << err << " " << err.message();
}

#include <boost/thread.hpp>
//#include "AsyncTCPClient.h"

struct AsyncTCPClient {
    AsyncTCPClient(boost::asio::io_service &iIoService, const std::string &iServerIP, const std::string &iPath);
    ~AsyncTCPClient();

    std::ifstream mSourceFile;
    boost::asio::streambuf mRequest, mAckBuf;
    tcp::resolver mResolver;
    tcp::socket mSocket;

    void HandleResolve(const boost::system::error_code &iErr, tcp::resolver::iterator iEndpointIterator);
    void HandleConnect(const boost::system::error_code &iErr, tcp::resolver::iterator iEndpointIterator);
    void HandleWriteFile(const boost::system::error_code &iErr);
    void HandleReceiveAcknowledge(const boost::system::error_code &iErr);

    Buffer mBuffer;
};

AsyncTCPClient::AsyncTCPClient(boost::asio::io_service &iIoService, const std::string &iServerIP,
                               const std::string &iPath)
        : mResolver(iIoService), mSocket(iIoService) {
    size_t wPos = iServerIP.find(':');
    if (wPos == std::string::npos) {
        return;
    }
    std::string wPortStr = iServerIP.substr(wPos + 1);
    std::string wServerIP = iServerIP.substr(0, wPos);

    mSourceFile.open(iPath, std::ios_base::binary | std::ios_base::ate);
    if (!mSourceFile) {
        LOG(LOGERROR) << "Failed to open file: " << iPath;
        return;
    }
    size_t wFileSize = mSourceFile.tellg();
    mSourceFile.seekg(0);
    std::ostream wRequestStream(&mRequest);
    wRequestStream << iPath << "\n" << wFileSize << "\n\n";

    LOG(LOGINFO) << "File to transfer: " << iPath;
    LOG(LOGINFO) << "Filesize: " << wFileSize << " bytes";

    tcp::resolver::query wQuery(wServerIP, wPortStr);
    mResolver.async_resolve(wQuery, boost::bind(&AsyncTCPClient::HandleResolve, this, boost::asio::placeholders::error,
                                                boost::asio::placeholders::iterator));
}

AsyncTCPClient::~AsyncTCPClient() {}

void AsyncTCPClient::HandleResolve(const boost::system::error_code &iErr, tcp::resolver::iterator iEndpointIterator) {
    if (!iErr) {
        tcp::endpoint wEndpoint = *iEndpointIterator;
        mSocket.async_connect(wEndpoint, boost::bind(&AsyncTCPClient::HandleConnect, this,
                                                     boost::asio::placeholders::error, ++iEndpointIterator));
    } else {
        LOG(LOGERROR) << "Error: " << iErr.message();
    }
}

void AsyncTCPClient::HandleConnect(const boost::system::error_code &iErr, tcp::resolver::iterator iEndpointIterator) {
    if (!iErr) {
        boost::asio::async_write(mSocket, mRequest,
                                 boost::bind(&AsyncTCPClient::HandleWriteFile, this, boost::asio::placeholders::error));
    } else if (iEndpointIterator != tcp::resolver::iterator()) {
        mSocket.close();
        tcp::endpoint wEndpoint = *iEndpointIterator;
        mSocket.async_connect(wEndpoint, boost::bind(&AsyncTCPClient::HandleConnect, this,
                                                     boost::asio::placeholders::error, ++iEndpointIterator));
    } else {
        LOG(LOGERROR) << "Error: " << iErr.message();
    }
}

void AsyncTCPClient::HandleWriteFile(const boost::system::error_code &iErr) {
    if (!iErr) {
        if (mSourceFile) {
            mSourceFile.read(mBuffer.c_array(), (std::streamsize)mBuffer.size());

            // EOF reached
            if (mSourceFile.gcount() <= 0) {
                LOG(LOGINFO) << "EOF reached";
                return;
            }

            // LOG(LOGTRACE) << "Send " << mSourceFile.gcount() << "bytes, total: " << mSourceFile.tellg() << "
            // bytes.\n";
            boost::asio::async_write(
                mSocket, boost::asio::buffer(mBuffer.c_array(), mSourceFile.gcount()),
                boost::bind(&AsyncTCPClient::HandleWriteFile, this, boost::asio::placeholders::error));
        } else {
            LOG(LOGINFO) << "File transmission done";
            /// async_read responsible for receiving a simple "ack[;]" once server is done receiving
            /// when I don't do this and simply return the server receives the file properly and sends the ack
            /// when I do this the server stops never receives the full file and simply waits for all the bytes to
            /// arrive which doesn't happen
            boost::asio::async_read_until(
                mSocket, mAckBuf, "[;]",
                boost::bind(&AsyncTCPClient::HandleReceiveAcknowledge, this, boost::asio::placeholders::error));
        }
    } else {
        LOG(LOGERROR) << "Error value: " << iErr.value();
        LOG(LOGERROR) << "Error message: " << iErr.message();
        throw std::exception();
    }
}

void AsyncTCPClient::HandleReceiveAcknowledge(const boost::system::error_code &iErr) {
    if (!iErr) {
        LOG(LOGDEBUG1) << "Acknowledged this data: " << &mAckBuf;
        return;
    } else {
        // in case of error free resources and bail
        LOG(LOGERROR) << "Error value: " << iErr.value();
        LOG(LOGERROR) << "Error message: " << iErr.message();
        throw std::exception();
    }
}

//////////////////////////////////////////////////
//////////////////////////////////////////////////

#include <boost/array.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/thread.hpp>
#include <fstream>
#include <iostream>
//#include "AsyncTCPClient.h"
//#include "AsyncTCPServer.h"
//#include "Debug.h"

struct AsyncTCPServer {
    using mAsyncTCPConnectionPtr = boost::shared_ptr<AsyncTCPConnection>;

    AsyncTCPServer(unsigned short iPort, const std::string iFilePath);
    ~AsyncTCPServer();
    void HandleAccept(mAsyncTCPConnectionPtr iCurConnection, const boost::system::error_code &iErr);

    boost::asio::io_service mIoService;
    tcp::acceptor mAcceptor;
};

AsyncTCPServer::AsyncTCPServer(unsigned short iPort, const std::string iFilePath)
        : mAcceptor(mIoService, tcp::endpoint(tcp::v4(), iPort), true) {
    mAsyncTCPConnectionPtr wNewConnection(new AsyncTCPConnection(mIoService, iFilePath));
    mAcceptor.set_option(tcp::acceptor::reuse_address(true));
    mAcceptor.async_accept(wNewConnection->Socket(), boost::bind(&AsyncTCPServer::HandleAccept, this, wNewConnection,
                                                                 boost::asio::placeholders::error));
    mIoService.run();
}

AsyncTCPServer::~AsyncTCPServer() { mIoService.stop(); }

void AsyncTCPServer::HandleAccept(mAsyncTCPConnectionPtr iCurConnection, const boost::system::error_code &iErr) {
    if (!iErr) {
        iCurConnection->Start();
    } else {
        BIOLOG(BioSans::LOGERROR) << " " << iErr << ", " << iErr.message();
    }
}

int main() {
    boost::thread th([] {
        boost::asio::io_service wIoService;
        AsyncTCPClient client(wIoService, "127.0.0.1:6767", 
                //"/etc/dictionaries-common/words"
                "main.cpp"
            );
        boost::this_thread::sleep_for(boost::chrono::seconds(1));
        wIoService.run();
    });

    AsyncTCPServer server(6767, "outputfile.txt");
}

Prints

LOG:LOGINFO File to transfer: main.cpp
LOG:LOGINFO Filesize: 12793 bytes
LOG:LOGINFO Start
LOG:LOGTRACE    (16), in_avail = 512, size = 512
LOG:LOGTRACE    Original filename main.cpp
LOG:LOGTRACE    Original filesize 12793
LOG:LOGTRACE    Write 496 bytes
LOG:LOGTRACE    Write 0 bytes
LOG:LOGDEBUG1   expectedContent: 12297
LOG:LOGINFO File transmission done
LOG:LOGTRACE    Received 1520 bytes (+1024)
LOG:LOGDEBUG1   expectedContent: 11273
LOG:LOGTRACE    Received 2544 bytes (+1024)
LOG:LOGDEBUG1   expectedContent: 10249
LOG:LOGTRACE    Received 3568 bytes (+1024)
LOG:LOGDEBUG1   expectedContent: 9225
LOG:LOGTRACE    Received 4592 bytes (+1024)
LOG:LOGDEBUG1   expectedContent: 8201
LOG:LOGTRACE    Received 5616 bytes (+1024)
LOG:LOGDEBUG1   expectedContent: 7177
LOG:LOGTRACE    Received 6640 bytes (+1024)
LOG:LOGDEBUG1   expectedContent: 6153
LOG:LOGTRACE    Received 7664 bytes (+1024)
LOG:LOGDEBUG1   expectedContent: 5129
LOG:LOGTRACE    Received 8688 bytes (+1024)
LOG:LOGDEBUG1   expectedContent: 4105
LOG:LOGTRACE    Received 9712 bytes (+1024)
LOG:LOGDEBUG1   expectedContent: 3081
LOG:LOGTRACE    Received 10736 bytes (+1024)
LOG:LOGDEBUG1   expectedContent: 2057
LOG:LOGTRACE    Received 11760 bytes (+1024)
LOG:LOGDEBUG1   expectedContent: 1033
LOG:LOGTRACE    Received 12784 bytes (+1024)
LOG:LOGDEBUG1   expectedContent: 9
LOG:LOGTRACE    Received 12793 bytes (+9)
LOG:LOGTRACE    Receive complete at 12793 bytes
LOG:LOGDEBUG1   Message ACK sent
LOG:LOGDEBUG1   Acknowledged this data: ack[;]

And indeed the file is identical:

d61f0515bc4ba003497d67e265b5e0bc  main.cpp
d61f0515bc4ba003497d67e265b5e0bc  outputfile.txt

Upvotes: 6

Related Questions