Rohit Chauhan
Rohit Chauhan

Reputation: 155

Boost Asio async_read and async_receive not getting called itself whenever there is a data on tcp socket But boost::asio::read got call automatically

I am making a bidirectional asynchronous (non blocking) TCP client/server socket communication in Linux where a server can handle multiple clients. I am using boost ASIO TCP library framework for my requirements.

In my project there are two processes: client and server.

I want to transfer a custom structure (which has some enums, strings and other kind of data) over the TCP. For that I use Boost Serialization.

In both server and client I have implemented startReading() and startWriting() function in async mode using async_read and async_write.

To test, I

The client is not getting the reply, whether I use async_read or async_receive.

If I am using a blocking asio::read in the client startReading(), the client gets the message.

But I want it async.

I am sharing the completed project code below, please check and give me some suggestions. I am using ubuntu 20.04 64 bit os, C++14, Boost 1.71.

Server Logs "Server Logs"

Client Read Logs (which is working) "Clients Read Logs"

Client async_read logs which is not working "Client async read logs"

test_server.cpp 'its server process main function'

//test_server.cpp
#include <iostream>
#include "tcpServer.h"

int main(int argc, char *argv[])
{
  try
  {
    if (argc != 2)
    {
      std::cerr << "Missing port no: Please pass port no argument in async_tcp_server\n";
      return 1;
    }
    boost::asio::io_context io_context;
    TcpServer server(io_context,std::atoi(argv[1]));
    io_context.run();
  }
  catch (std::exception &e)
  {
    std::cerr <<"Exception: " << e.what() << std::endl;
  }
  return 0;
}

This is tcpServer.h


//tcpServer.h
#ifndef TCP_SERVER_H
#define TCP_SERVER_H

#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <string>
#include "AppPlugin.h"


/**
*   @class  TcpConnectionHandler tcpServer.h "server/tcpServer.h"
*   @brief  
*/
class TcpConnectionHandler : public boost::enable_shared_from_this<TcpConnectionHandler>
{
private:
    tcp::socket serverSocket_;
    TcpConnectionHandler(boost::asio::io_context&);
    TestManagerComInfo messageInfo;

public:
    enum { max_length = 4096};
    char data[max_length];
    string message="ok client, I am sending the strucure!";
    
    typedef boost::shared_ptr<TcpConnectionHandler> pointer;
    // creating the pointer
    static inline pointer create(boost::asio::io_context& io_context)
    {
        return pointer(new TcpConnectionHandler(io_context));
    }
    //socket creation
    tcp::socket& socket()
    {
        return serverSocket_;
    }

    void StartReading();
    void StartWriting();
    void WriteDone(const boost::system::error_code&, size_t);
    void ReadDone(const boost::system::error_code&, size_t);
    void setMessageStruct(const TestManagerComInfo&);
    TestManagerComInfo& getMessageStruct()
    {
        return messageInfo;
    }
    void SockDiscNotifierServer();
};

/**
*   @class  TcpServer tcpServer.h "server/tcpServer.h"
*   @brief  
*/
class TcpServer 
{
private:
    tcp::acceptor acceptor_;
    boost::asio::io_context& io_context_local;
    boost::asio::io_context& getioContext()
    {
        return io_context_local;
    }
   void start_accept();

public:
    TcpServer(boost::asio::io_context&,short);  
};

#endif  // TCP_SERVER_H

This is tcpserver.cpp

//tcpServer.cpp
#include "tcpServer.h"


TcpConnectionHandler::TcpConnectionHandler(boost::asio::io_context &io_context) : serverSocket_(io_context)
{
messageInfo.requestType_ = kTerminate;
    messageInfo.name_ = "COM_IPC_GROUP_001";
    messageInfo.testLogType_ = {
        {"Test1", kTestPass},
        {"Test112", kTestFail},
        {"Test3", kTestAbort}
    };
    messageInfo.statusType_ = kCompleted;
}

void TcpConnectionHandler::setMessageStruct(const TestManagerComInfo &passedMessageStruct)
{
#if 0
    messageInfo.requestType_ = kTerminate;
    messageInfo.name_ = "COM_IPC_GROUP_001";
    messageInfo.testLogType_ = {
        {"Test1", kTestPass},
        {"Test2", kTestFail},
        {"Test3", kTestAbort}
    };
    messageInfo.statusType_ = kCompleted;
#endif

    messageInfo.requestType_ = passedMessageStruct.requestType_;
    messageInfo.name_ = passedMessageStruct.name_;
    for (const auto &entry : passedMessageStruct.testLogType_)
    {
        messageInfo.testLogType_[entry.first] = entry.second;
    }
    messageInfo.statusType_ = passedMessageStruct.statusType_;
}

void TcpConnectionHandler::StartReading()
{
    serverSocket_.async_read_some(
        boost::asio::buffer(data, max_length),
        boost::bind(&TcpConnectionHandler::ReadDone,
                    shared_from_this(),
                    boost::asio::placeholders::error,
                    boost::asio::placeholders::bytes_transferred));    
}

void TcpConnectionHandler::StartWriting()
{
    string serializedData = serializeMessageStruct(getMessageStruct());
        serverSocket_.async_write_some(
        boost::asio::buffer(serializedData, max_length),
        boost::bind(&TcpConnectionHandler::WriteDone,
                    shared_from_this(),
                    boost::asio::placeholders::error,
                    boost::asio::placeholders::bytes_transferred));
}
void TcpConnectionHandler::ReadDone(const boost::system::error_code &err, size_t bytes_transferred)
{
    if (!err)
    {
        cout << "server received data is '" << data << "'\n";
        StartWriting();
    }
    else
    {
        SockDiscNotifierServer();
        cerr << "Read error: " << err.message() << endl;
    }
}

void TcpConnectionHandler::WriteDone(const boost::system::error_code &err, size_t bytes_transferred)
{
    if (!err)
    {
        cout << "Server sent message!" << endl;
        serverSocket_.is_open()?cout<<"Server socket is still connected\n":cout<<"server socket got disconnected\n";
    }
    else
    {
        SockDiscNotifierServer();
        cerr << "Write error: " << err.message() << endl;
    }
}

void TcpConnectionHandler::SockDiscNotifierServer()
{
    cout<<"server notifier handler for socket disconnect\n";
}

/********************************************************************************************************************************************************************/

TcpServer::TcpServer(boost::asio::io_context &io_context, short port_No) : io_context_local(io_context), acceptor_(io_context,
                                                                                                                   tcp::endpoint(tcp::v4(), port_No))
{
    start_accept();
}

void TcpServer::start_accept()
{
    TcpConnectionHandler::pointer connection = TcpConnectionHandler::create(getioContext());
        acceptor_.async_accept(connection->socket(), [this,connection](boost::system::error_code ec) {
        std::cout << "async_accept -> " << ec.message() << "\n";
        if (!ec) {
            connection->StartReading();
            start_accept();
        }
    });
}

test_client.cpp 'its client process main function'

//test_client.cpp
#include <iostream>

#include "tcpClient.h"

// Client side
int main(int argc, char *argv[])
{
    // we need 2 things: ip address and port number, in that order
    if (argc != 3)
    {
        cerr << "Usage: ip_address port" << endl;
        exit(0);
    }
    char *serverIp = argv[1];
    char *port = argv[2];
    boost::asio::io_context io_context;
    TcpClient tcpcli(io_context,serverIp, port);
     std::thread io_thread([&io_context]() {
        io_context.run();
    });
    tcpcli.StartWriting();
    sleep(1);
    tcpcli.StartReading();
    io_thread.join();
    return 0;
}

This is tcpClient.h

//tcpClient.h
#ifndef TCP_CLIENT_H
#define TCP_CLIENT_H

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <cstdlib>
#include <iostream>
#include <string>

#include "AppPlugin.h"



/**
 *   @class  TcpClient tcpClient.h "client/tcpClient.h"
 *   @brief
 */
class TcpClient
{
public:
    TcpClient(boost::asio::io_context& io_context, string serverIp, string port);
    void readData(const boost::system::error_code&, size_t);
    void sendData(const boost::system::error_code&, size_t);
    enum
    {
        max_length = 4096
    };
    void SockDiscNotifierClient();
    boost::asio::ip::tcp::socket socketClient_;
    void StartReading();
    void StartWriting();

private:
    string serverIp_, port_;
    bool connectStatus;
    boost::asio::streambuf recvBuffer_;
    boost::asio::io_context& io_context_local;
    boost::asio::io_context& getioContext()
    {
        return io_context_local;
    }
    //TestManagerComInfo DeserializeMessageStruct(boost::asio::streambuf&);

};

#endif  // TCP_CLIENT_H

This is tcpClient.cpp In this code I have implemented 3 startReading() and startWriting() functions. The first is implemented by me and working second I have reffered from google and boost documentation 3rd I have taken reference from chatgpt.

//tcpClient.cpp
#include "tcpClient.h"

TcpClient::TcpClient(boost::asio::io_context& io_context, string serverIp, string port)
    : serverIp_(serverIp), port_(port), socketClient_(io_context), io_context_local(io_context)
{
    connectStatus = false;
    if(!connectStatus)
    {
        socketClient_.connect(tcp::endpoint(
        boost::asio::ip::address::from_string(serverIp_), std::stoi(port_)));
        connectStatus = true;
    }
    std::cout << "Client Started: \n"; 
}

//This StartReading is working
#if 1 //first startreading
void TcpClient::StartReading()
{
    // getting response from server
    cout<<"----->TcpClient::StartReading\n";
    boost::system::error_code error;
    //boost::asio::streambuf receiveBuff;
    boost::asio::read(socketClient_, recvBuffer_, boost::asio::transfer_all(), error);
    if (error && error != boost::asio::error::eof)
    {
        SockDiscNotifierClient();
        cout << "receive failed: " << error.message() << endl;
    }
    else
    {
        DeserializeMessageStruct(recvBuffer_);        
    }
}
#endif

//This StartReading is not working
#if 0 //second startreading
void TcpClient::StartReading()
{
    cout << "----->TCPClient::StartReading\n";
    boost::asio::async_read(socketClient_, recvBuffer_, boost::asio::transfer_all(),
                            [this](const boost::system::error_code &error, std::size_t bytes_transferred)
                            {
                                cout<<"in TCPClient::StartReading labda\n";
                               // getioContext().run();
                                if (!error)
                                {
                                    std::string receivedData(boost::asio::buffer_cast<const char*>(recvBuffer_.data()), boost::asio::buffer_size(recvBuffer_.data()));
                                    cout<<"deserializing client received\n"<<receivedData;
                                    recvBuffer_.consume(bytes_transferred);
                                    StartReading();
                                    //DeserializeMessageStruct(recvBuffer_);
                                }
                                else if (error == boost::asio::error::eof)
                                {
                                    std::cerr << "Server closed the connection." << std::endl;
                                    socketClient_.close(); // Close the socket
                                }
                                else
                                {
                                    std::cerr << "Client: Error reading data: " << error.message() << std::endl;
                                }
                            });
                            
}
#endif

//This is suggested by chatgpt and its also not working
#if 0 //third startreading
void TcpClient::StartReading() {
    // Create a lambda function that encapsulates the StartReading logic
    boost::asio::io_context& io_context = static_cast<boost::asio::io_context&>(socketClient_.get_executor().context());
    auto readOperation = [this,&io_context]() {
        cout << "----->TCPClient::StartReading\n";
        socketClient_.async_receive(
        boost::asio::buffer(recvBuffer_.prepare(max_length)),
        [this](const boost::system::error_code &err, size_t bytes_transferred)
        {
            if (!err)
            {
                //recvBuffer_.commit(bytes_transferred); // Commit the received data
                DeserializeMessageStruct(recvBuffer_);
            }
            else
            {
                SockDiscNotifierClient();
                cerr << "Read error: " << err.message() << endl;
            }
        });
        io_context.run();
    };
    std::thread readingThread(readOperation);

    readingThread.join();
}
#endif


#if 1 //First StartWriting
void TcpClient::StartWriting()
{
     // request/message from client
    const string msg = "Hello Server Please send me the strucure!\n";
    cout<<"----->TcpClient::StartWritin\n";

    socketClient_.async_write_some(
        boost::asio::buffer(msg, max_length),
        boost::bind(&TcpClient::sendData, this,
                    boost::asio::placeholders::error,
                    boost::asio::placeholders::bytes_transferred));

}
#endif

#if 0 //Second StartWriting
void TcpClient::StartWriting()
{
    // Request/message from client
    const string msg = "Hello Server Please send me the structure!\n";
    cout << "----->TcpClient::StartWriting\n";

    // Start an asynchronous write operation
    socketClient_.async_write_some(
        boost::asio::buffer(msg, max_length),
        [this](const boost::system::error_code &error, std::size_t bytes_transferred)
        {
            if (!error)
            {
                cout << "Client Successfully Sent Message!\n";
                // Start reading after writing is complete
                StartReading();
            }
            else
            {
                std::cerr << "Send error: " << error.message() << std::endl;
            }
        });
}
#endif 

#if 0 //Third StartWriting
void TcpClient::StartWriting() {
    // Request/message from client
    boost::asio::io_context& io_context = static_cast<boost::asio::io_context&>(socketClient_.get_executor().context());  
    const string msg = "Hello Server Please send me the structure!\n";
    cout << "----->TcpClient::StartWriting\n";
    // Create a lambda function that encapsulates the StartWriting logic
    auto writeOperation = [this, &io_context, msg]() {
        socketClient_.async_write_some(
            boost::asio::buffer(msg, max_length),
            [this](const boost::system::error_code &error, std::size_t bytes_transferred) {
                if (!error) {
                    cout << "Client Successfully Sent Message!\n";
                    // Start reading after writing is complete
                     StartReading();
                } else {
                    std::cerr << "Send error: " << error.message() << std::endl;
                }
            });
            io_context.run();
    };

    // Create a thread and execute the lambda function
    std::thread writingThread(writeOperation);
   // Wait for the thread to finish
    writingThread.join();
}
#endif

void TcpClient::readData(const boost::system::error_code &err,
                         size_t bytes_transferred)
{
    if (!err)
    {
        cout << "Client Successfuly Read Message! ";
    }
    else
    {
        SockDiscNotifierClient();
        cerr << "Read error: " << err.message() << endl;
    }
}

void TcpClient::sendData(const boost::system::error_code &err,
                         size_t bytes_transferred)
{
    if (!err)
    {
        cout << "Client Successfuly Sent Message!\n";
    }
    else
    {
        SockDiscNotifierClient();
        cerr << "Send error: " << err.message() << endl;
    }
}

void TcpClient::SockDiscNotifierClient()
{
    cout<<"client notifier handler for socket disconnect";
    
}

This is AppPlugin.h, which contains serialize and deserialize function also the defination of structure


//AppPlugin.h
#ifndef APPPLUGIN_H
#define APPPLUGIN_H

#include <bits/stdc++.h>

#include <boost/serialization/map.hpp>
#include <boost/serialization/string.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/archive/text_iarchive.hpp>

using boost::asio::ip::tcp;
using std::cerr;
using std::cin;
using std::cout;
using std::endl;
using std::string;

enum RequestType
{
    kSatelliteStatus = 0x00,
    kApplicationPresence = 0x01,
    kExecuteTestSuite = 0x02,
    kTestSuiteLog = 0x03,
    kTerminate = 0x04
};

enum StatusType
{
    kInit = 0x00,
    kStarted = 0x01,
    kCompleted = 0x02
};

enum TestResultType
{
    kTestNotExecute,
    kTestPass,
    kTestFail,
    kTestAbort
};

using TestLogType = std::map<std::string, TestResultType>;

#pragma pack(push, 1)  // Pack the structure
struct TestManagerComInfo
{
    RequestType requestType_;
    std::string name_;
    TestLogType testLogType_;
    StatusType statusType_;

    template <typename Archive>
    void serialize(Archive& ar, const unsigned int version)
    {
        ar& requestType_;
        ar& name_;
        ar& testLogType_;
        ar& statusType_;
    }
};
#pragma pack(pop)  // Restore default packing

/**
* @fn serializeMessageStruct
* @brief Its for serialize the TestManagerComInfo struct and convert into the string
* @param messageInfoNeedtoSerialize is the argunment which is TestManagerComInfo's object
*/
inline string serializeMessageStruct(TestManagerComInfo &messageInfoNeedtoSerialize)
{
    std::ostringstream archiveStream;
    boost::archive::text_oarchive archive(archiveStream);
    archive << messageInfoNeedtoSerialize;
    return archiveStream.str();
}

inline TestManagerComInfo DeserializeMessageStruct(boost::asio::streambuf& deserializeBuff)
{
    TestManagerComInfo receivedMsgStruct;
    std::istream archiveStream(&deserializeBuff);
        boost::archive::text_iarchive archive(archiveStream);
        archive >> receivedMsgStruct;
        cout << "Received Request Type: "
             << static_cast<int>(receivedMsgStruct.requestType_) << std::endl;
        cout << "Received Name: " << receivedMsgStruct.name_ << std::endl;
        cout << "Received Test Log:" << endl;
        for (const auto &test : receivedMsgStruct.testLogType_)
        {
            cout << "  " << test.first << ": " << test.second << endl;
        }
        std::cout << "Received Status Type: "
                  << static_cast<int>(receivedMsgStruct.statusType_)
                  << std::endl;

    return receivedMsgStruct;
}



#endif  // APPPLUGIN_H

Upvotes: 1

Views: 306

Answers (1)

sehe
sehe

Reputation: 392833

A first question that rises is: why do you want to make the operation async.

A second question is: what implementation do you wish to land on. I'm going to assume implementation 2 - the one that you hand-wrote.

Regardless of everything, the problem is the same:

TcpClient   tcpcli(io_context, serverIp, port);
std::thread io_thread([&io_context]() { io_context.run(); });

tcpcli.StartWriting();
sleep(1);

The io_thread runs out of work. Whether it does so before StartWriting() is invoked is a race condition - it will depend on the weather conditions. However, the sleep(1) simply guarantees that io_thread is done¹.

You need a work guard of sorts to prevent the thread from exiting prematurely. My favorite approach is to use asio::thread_pool which does this internally:

boost::asio::thread_pool io_context(1); // single thread

TcpClient tcpcli(io_context.get_executor(), serverIp, port);

tcpcli.StartWriting();
io_context.join();

This requires you to modify your code to accept executors instead of hard-coding io_context&.

Why Is The Server Okay?

Glad you asked. The TcpServer is different, because it kicks of an infinite chain of async operations from inside its constructor, so there is always work:

TcpServer::TcpServer(boost::asio::io_context& io_context, std::uint16_t port_No)
    : io_context_local(io_context)
    , acceptor_(io_context, tcp::endpoint(tcp::v4(), port_No)) {
    start_accept();
}

Code Review

Your code has some issues.

  1. There's a lot of code that doesn't belong in header files.
  2. Similarly there's a lot of excess member visibility.
  3. The use of io_context& references throughout is unnecessarily limiting.
    • DeserializeMessageStruct not only doesn't belong in the header,
    • it shouldn't depend on Asio either
    • I would split the printing code out as well.
    • On serialization, make sure the archive is complete before extracting the string.
So we get `AppPlugin.cpp`:

    #include "AppPlugin.h"
    #include <ostream>
    #include <sstream>
    
    std::ostream& operator<<(std::ostream& os, TestManagerComInfo const& msg) {
        os << "Request Type: " << static_cast<int>(msg.requestType_) << std::endl;
        os << "Name: " << msg.name_ << std::endl;
        os << "Test Log:" << std::endl;
        for (auto const& [k, v] : msg.testLogType_) {
            os << "  " << k << ": " << v << std::endl;
        }
        os << "Status Type: " << static_cast<int>(msg.statusType_) << std::endl;
        return os;
    }
    
    std::string serializeMessageStruct(TestManagerComInfo& msg) {
        std::ostringstream oss;
        {
            boost::archive::text_oarchive archive(oss);
            archive << msg;
        } // SEHE: completes the archive
        return oss.str();
    }
    
    TestManagerComInfo DeserializeMessageStruct(std::streambuf& deserializeBuff) {
        TestManagerComInfo msg;
        std::istream archiveStream(&deserializeBuff);
        boost::archive::text_iarchive(archiveStream) >> msg;
        return msg;
    }

And then on usage, you write:

    TestManagerComInfo msg = DeserializeMessageStruct(recvBuffer_);
    std::cout << "RECEIVED: " << msg;
  1. You clearly didn't enable compiler warnings, or you would notice the type mismatches on ports and e.g. wrong member initialization orders, unused variables etc.

  2. Note that #pragma pack has no effect if you're serializing using Boost Serialization as you do. In fact, it's unportable by definition and can only be a performance pessimization as your struct isn't trivial, standard-layout or POD anyways.

  3. The dance with buffers is very inefficient:

    std::string receivedData(asio::buffer_cast<const char*>(recvBuffer_.data()),
                             asio::buffer_size(recvBuffer_.data()));
    std::cout << "deserializing client received\n" << receivedData;
    TestManagerComInfo msg = DeserializeMessageStruct(recvBuffer_);
    recvBuffer_.consume(bytes_transferred);
    

    This could "just" be

    std::cout << "RECEIVED: " << DeserializeMessageStruct(recvBuffer_);
    
  4. You also redundantly invoke StartReading from main, where it already happens from the completion handler in StartWriting.

  5. The message "Server socket is still connected" is misleading, as is_open doesn't imply that the peer hasn't shutdown the connection.

  6. tranfer_all() is the default completion condition for async_read.

  7. Your async_write_some needs to be asio::async_write to ensure the full buffer gets sent. You are passing a local variable as the buffer, which is UB because it doesn't live till completion of the async operation. Either make it static or e.g. a member variable.

  8. Applying the unrelated max_length to it is ... useless at best.

  9. SockDiscNotifierClient is misleading, as it's not always called in response to disconnection. That said, sendData/readData seem unnecessary now, so I'll just remove them.

  10. There's a bunch of inconsistent using namespace and std::endl vs '\n' (use std::endl if you expect a flush).

Hmm. Now I'm noticing the server also uses async_read_some and async_write_some instead of composed operations that avoid partial messages. I'll leave that as an exercise for the reader for now.

PS: later I changed the interface to take an istream instead of streambuf

¹ (unless somehow writing took longer than a second, which seems improbable)

PUTTING IT ALL TOGETHER

  • File tcpServer.h

     // tcpServer.h
     #ifndef TCP_SERVER_H
     #define TCP_SERVER_H
    
     #include "AppPlugin.h"
     #include <boost/asio.hpp>
     #include <string>
    
     /**
      *   @class  TcpServer tcpServer.h "server/tcpServer.h"
      *   @brief
      */
     class TcpServer {
       private:
         boost::asio::any_io_executor   io_context_local;
         boost::asio::ip::tcp::acceptor acceptor_;
         boost::asio::any_io_executor   getioContext() { return io_context_local; }
         void                           start_accept();
    
       public:
         TcpServer(boost::asio::any_io_executor, uint16_t);
     };
    
     #endif // TCP_SERVER_H
    
  • File tcpServer.cpp

     // tcpServer.cpp
     #include "tcpServer.h"
     #include <boost/bind/bind.hpp>
     #include <iomanip>
     #include <iostream>
     #include <memory>
    
     namespace asio = boost::asio;
     using asio::ip::tcp;
     using boost::system::error_code;
    
     /**
      *   @class  TcpConnectionHandler tcpServer.h "server/tcpServer.h"
      *   @brief
      */
     class TcpConnectionHandler : public std::enable_shared_from_this<TcpConnectionHandler> {
       private:
         tcp::socket serverSocket_;
         TcpConnectionHandler(asio::any_io_executor);
         TestManagerComInfo messageInfo;
    
         enum { max_length = 4096 };
         char        data[max_length];
         std::string message = "ok client, I am sending the structure!";
    
       public:
         using pointer = std::shared_ptr<TcpConnectionHandler>;
         // creating the pointer
         static inline pointer create(asio::any_io_executor io_context) {
             return pointer(new TcpConnectionHandler(io_context));
         }
         // socket creation
         tcp::socket& socket() { return serverSocket_; }
    
         void StartReading();
         void StartWriting();
         void WriteDone(error_code, size_t);
         void ReadDone(error_code, size_t);
         void setMessageStruct(TestManagerComInfo const&);
         void SockDiscNotifierServer();
    
         TestManagerComInfo const& getMessageStruct() const { return messageInfo; }
         TestManagerComInfo&       getMessageStruct() { return messageInfo; }
     };
    
     TcpConnectionHandler::TcpConnectionHandler(asio::any_io_executor io_context) : serverSocket_(io_context) {
         messageInfo.requestType_ = kTerminate;
         messageInfo.name_        = "COM_IPC_GROUP_001";
         messageInfo.testLogType_ = {{"Test1", kTestPass}, {"Test112", kTestFail}, {"Test3", kTestAbort}};
         messageInfo.statusType_  = kCompleted;
     }
    
     void TcpConnectionHandler::setMessageStruct(TestManagerComInfo const& passedMessageStruct) {
     #if 0
         messageInfo.requestType_ = kTerminate;
         messageInfo.name_ = "COM_IPC_GROUP_001";
         messageInfo.testLogType_ = {
             {"Test1", kTestPass},
             {"Test2", kTestFail},
             {"Test3", kTestAbort}
         };
         messageInfo.statusType_ = kCompleted;
     #endif
    
         messageInfo.requestType_ = passedMessageStruct.requestType_;
         messageInfo.name_        = passedMessageStruct.name_;
         for (auto const& entry : passedMessageStruct.testLogType_) {
             messageInfo.testLogType_[entry.first] = entry.second;
         }
         messageInfo.statusType_ = passedMessageStruct.statusType_;
     }
    
     void TcpConnectionHandler::StartReading() {
         serverSocket_.async_read_some(asio::buffer(data, max_length),
                                       boost::bind(&TcpConnectionHandler::ReadDone, shared_from_this(),
                                                   asio::placeholders::error,
                                                   asio::placeholders::bytes_transferred));
     }
    
     void TcpConnectionHandler::StartWriting() {
         std::string serializedData = serializeMessageStruct(getMessageStruct());
         serverSocket_.async_write_some(asio::buffer(serializedData, max_length),
                                        boost::bind(&TcpConnectionHandler::WriteDone, shared_from_this(),
                                                    asio::placeholders::error,
                                                    asio::placeholders::bytes_transferred));
     }
    
     void TcpConnectionHandler::ReadDone(error_code err, size_t bytes_transferred) {
         if (!err) {
             std::cout << "server received data is "
                       << std::quoted(std::string_view(data, bytes_transferred), '\'') << std::endl;
             StartWriting();
         } else {
             SockDiscNotifierServer();
             std::cerr << "Read error: " << err.message() << std::endl;
         }
     }
    
     void TcpConnectionHandler::WriteDone(error_code err, size_t /*bytes_transferred*/) {
         if (!err) {
             std::cout << "Server sent message!" << std::endl;
         } else {
             SockDiscNotifierServer();
             std::cerr << "Write error: " << err.message() << std::endl;
         }
     }
    
     void TcpConnectionHandler::SockDiscNotifierServer() {
         std::cout << "server notifier handler for socket disconnect" << std::endl;
     }
    
     /********************************************************************************************************************************************************************/
    
     TcpServer::TcpServer(asio::any_io_executor io_context, uint16_t port)
         : io_context_local(io_context)
         , acceptor_(io_context, tcp::endpoint(tcp::v4(), port)) {
         start_accept();
     }
    
     void TcpServer::start_accept() {
         TcpConnectionHandler::pointer connection = TcpConnectionHandler::create(getioContext());
         acceptor_.async_accept(connection->socket(), [this, connection](error_code ec) {
             std::cout << "async_accept -> " << ec.message() << std::endl;
             if (!ec) {
                 connection->StartReading();
                 start_accept();
             }
         });
     }
    
  • File tcpClient.h

     // tcpClient.h
     #ifndef TCP_CLIENT_H
     #define TCP_CLIENT_H
    
     #include <boost/asio.hpp>
    
     /**
      *   @class  TcpClient tcpClient.h "client/tcpClient.h"
      *   @brief
      */
     class TcpClient {
       public:
         TcpClient(boost::asio::any_io_executor io_context, std::string serverIp, std::string port);
         void StartWriting();
    
       private:
         void StartReading();
    
         boost::asio::ip::tcp::socket socketClient_;
         std::string                  serverIp_, port_;
         bool                         connectStatus;
         boost::asio::streambuf       recvBuffer_;
     };
    
     #endif // TCP_CLIENT_H
    
  • File tcpClient.cpp

     // tcpClient.cpp
     #include "tcpClient.h"
     #include "AppPlugin.h"
     #include <iostream>
    
     namespace asio = boost::asio;
     using asio::ip::tcp;
     using boost::system::error_code;
    
     TcpClient::TcpClient(asio::any_io_executor ex, std::string serverIp, std::string port)
         : socketClient_(ex)
         , serverIp_(serverIp)
         , port_(port) //
     {
         connectStatus = false;
         if (!connectStatus) {
             socketClient_.connect(tcp::endpoint(asio::ip::address::from_string(serverIp_),
                                                 static_cast<uint16_t>(std::stoi(port_))));
             connectStatus = true;
         }
         std::cout << "Client Started: \n";
     }
    
     static const std::string s_msg = "Hello Server Please send me the structure!\n";
    
     void TcpClient::StartReading() {
         std::cout << "----->TCPClient::StartReading\n";
         asio::async_read( //
             socketClient_, recvBuffer_, [this](error_code error, size_t /*bytes_transferred*/) {
                 std::cout << "in TCPClient::StartReading completion handler (" << error.message() << ")\n";
                 if (!error || error == asio::error::eof) {
                     std::cout << "RECEIVED: " << DeserializeMessageStruct(recvBuffer_) << "--------\n";
                 }
             });
     }
    
     void TcpClient::StartWriting() {
         // Request/message from client
         std::cout << "----->TcpClient::StartWriting\n";
    
         // Start an asynchronous write operation
         socketClient_.async_write_some( //
             asio::buffer(s_msg), [this](error_code error, size_t /*bytes_transferred*/) {
                 if (!error) {
                     std::cout << "Client Successfully Sent Message!\n";
                     // Start reading after writing is complete
                     StartReading();
                 } else {
                     std::cerr << "Send error: " << error.message() << std::endl;
                 }
             });
     }
    
  • File test_server.cpp

     // test_server.cpp
     #include "tcpServer.h"
     #include <iostream>
    
     int main(int argc, char* argv[]) try {
         if (argc != 2) {
             std::cerr << "Missing port no: Please pass port no argument in async_tcp_server\n";
             return 1;
         }
         boost::asio::io_context io_context;
         TcpServer server(io_context.get_executor(), static_cast<uint16_t>(std::atoi(argv[1])));
         io_context.run();
     } catch (std::exception const& e) {
         std::cerr << "Exception: " << e.what() << std::endl;
     }
    
  • File test_client.cpp

     // test_client.cpp
     #include <iostream>
    
     #include "tcpClient.h"
    
     // Client side
     int main(int argc, char* argv[]) try {
         // we need 2 things: ip address and port number, in that order
         if (argc != 3) {
             std::cerr << "Usage: ip_address port" << std::endl;
             std::exit(0);
         }
         char* serverIp = argv[1];
         char* port     = argv[2];
    
         boost::asio::thread_pool ioc(1); // single thread
    
         TcpClient tcpcli(ioc.get_executor(), serverIp, port);
         tcpcli.StartWriting();
    
         ioc.join();
     } catch(std::exception const& e) {
         std::cerr << "Exception: " << e.what() << std::endl;
     }
    
  • File AppPlugin.cpp

     #include "AppPlugin.h"
     #include <ostream>
     #include <sstream>
    
     #include <boost/archive/text_iarchive.hpp>
     #include <boost/archive/text_oarchive.hpp>
     #include <boost/serialization/map.hpp>
     #include <boost/serialization/string.hpp>
    
     std::ostream& operator<<(std::ostream& os, TestManagerComInfo const& msg) {
         os << "Request Type: " << static_cast<int>(msg.requestType_) << std::endl;
         os << "Name: " << msg.name_ << std::endl;
         os << "Test Log:" << std::endl;
         for (auto const& [k, v] : msg.testLogType_) {
             os << "  " << k << ": " << v << std::endl;
         }
         os << "Status Type: " << static_cast<int>(msg.statusType_) << std::endl;
         return os;
     }
    
     template <typename Ar> void serialize(Ar& ar, TestManagerComInfo& obj, unsigned /*version*/) {
         ar& obj.requestType_& obj.name_& obj.testLogType_& obj.statusType_;
     }
    
     std::string serializeMessageStruct(TestManagerComInfo& msg) {
         std::ostringstream oss;
         {
             boost::archive::text_oarchive archive(oss);
             archive << msg;
         } // SEHE: completes the archive
         return oss.str();
     }
    
     TestManagerComInfo DeserializeMessageStruct(std::streambuf& sb) {
         std::istream is(&sb);
         TestManagerComInfo msg;
         boost::archive::text_iarchive(is) >> msg;
         return msg;
     }
    
  • File AppPlugin.h

     // AppPlugin.h
     #ifndef APPPLUGIN_H
     #define APPPLUGIN_H
     #include <map>
     #include <string>
    
     enum RequestType {
         kSatelliteStatus     = 0x00,
         kApplicationPresence = 0x01,
         kExecuteTestSuite    = 0x02,
         kTestSuiteLog        = 0x03,
         kTerminate           = 0x04,
     };
    
     enum StatusType {
         kInit      = 0x00,
         kStarted   = 0x01,
         kCompleted = 0x02,
     };
    
     enum TestResultType {
         kTestNotExecute,
         kTestPass,
         kTestFail,
         kTestAbort,
     };
    
     using TestLogType = std::map<std::string, TestResultType>;
    
     struct TestManagerComInfo {
         RequestType requestType_;
         std::string name_;
         TestLogType testLogType_;
         StatusType  statusType_;
    
         friend std::ostream& operator<<(std::ostream&, TestManagerComInfo const&);
     };
    
     /**
      * @fn serializeMessageStruct
      * @brief serialize TestManagerComInfo into string
      * @param msg is the object to serialize
      */
     std::string serializeMessageStruct(TestManagerComInfo& msg);
     TestManagerComInfo DeserializeMessageStruct(std::streambuf& sb);
    
     #endif // APPPLUGIN_H
    

Local demo:

enter image description here

Upvotes: 0

Related Questions