Reputation: 155
I am making a bidirectional asynchronous (non blocking) TCP client/server socket communication in Linux where a server can handle multiple clients. I am using boost ASIO TCP library framework for my requirements.
In my project there are two processes: client and server.
I want to transfer a custom structure (which has some enums, strings and other kind of data) over the TCP. For that I use Boost Serialization.
In both server and client I have implemented startReading()
and
startWriting()
function in async mode using async_read
and async_write
.
To test, I
start server process, like
./test_server 9876
whenever a client process start from command line like
./test_client 127.0.0.1 9876
Instantly, it will connect we see the server notification that a client is connected.
Now client sends a hello message to the server. The server immediately replies with serialized struct.
The client is not getting the reply, whether I use async_read
or async_receive
.
If I am using a blocking asio::read
in the client startReading()
, the
client gets the message.
But I want it async.
I am sharing the completed project code below, please check and give me some suggestions. I am using ubuntu 20.04 64 bit os, C++14, Boost 1.71.
Client Read Logs (which is working)
Client async_read logs which is not working
test_server.cpp 'its server process main function'
//test_server.cpp
#include <iostream>
#include "tcpServer.h"
int main(int argc, char *argv[])
{
try
{
if (argc != 2)
{
std::cerr << "Missing port no: Please pass port no argument in async_tcp_server\n";
return 1;
}
boost::asio::io_context io_context;
TcpServer server(io_context,std::atoi(argv[1]));
io_context.run();
}
catch (std::exception &e)
{
std::cerr <<"Exception: " << e.what() << std::endl;
}
return 0;
}
This is tcpServer.h
//tcpServer.h
#ifndef TCP_SERVER_H
#define TCP_SERVER_H
#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <string>
#include "AppPlugin.h"
/**
* @class TcpConnectionHandler tcpServer.h "server/tcpServer.h"
* @brief
*/
class TcpConnectionHandler : public boost::enable_shared_from_this<TcpConnectionHandler>
{
private:
tcp::socket serverSocket_;
TcpConnectionHandler(boost::asio::io_context&);
TestManagerComInfo messageInfo;
public:
enum { max_length = 4096};
char data[max_length];
string message="ok client, I am sending the strucure!";
typedef boost::shared_ptr<TcpConnectionHandler> pointer;
// creating the pointer
static inline pointer create(boost::asio::io_context& io_context)
{
return pointer(new TcpConnectionHandler(io_context));
}
//socket creation
tcp::socket& socket()
{
return serverSocket_;
}
void StartReading();
void StartWriting();
void WriteDone(const boost::system::error_code&, size_t);
void ReadDone(const boost::system::error_code&, size_t);
void setMessageStruct(const TestManagerComInfo&);
TestManagerComInfo& getMessageStruct()
{
return messageInfo;
}
void SockDiscNotifierServer();
};
/**
* @class TcpServer tcpServer.h "server/tcpServer.h"
* @brief
*/
class TcpServer
{
private:
tcp::acceptor acceptor_;
boost::asio::io_context& io_context_local;
boost::asio::io_context& getioContext()
{
return io_context_local;
}
void start_accept();
public:
TcpServer(boost::asio::io_context&,short);
};
#endif // TCP_SERVER_H
This is tcpserver.cpp
//tcpServer.cpp
#include "tcpServer.h"
TcpConnectionHandler::TcpConnectionHandler(boost::asio::io_context &io_context) : serverSocket_(io_context)
{
messageInfo.requestType_ = kTerminate;
messageInfo.name_ = "COM_IPC_GROUP_001";
messageInfo.testLogType_ = {
{"Test1", kTestPass},
{"Test112", kTestFail},
{"Test3", kTestAbort}
};
messageInfo.statusType_ = kCompleted;
}
void TcpConnectionHandler::setMessageStruct(const TestManagerComInfo &passedMessageStruct)
{
#if 0
messageInfo.requestType_ = kTerminate;
messageInfo.name_ = "COM_IPC_GROUP_001";
messageInfo.testLogType_ = {
{"Test1", kTestPass},
{"Test2", kTestFail},
{"Test3", kTestAbort}
};
messageInfo.statusType_ = kCompleted;
#endif
messageInfo.requestType_ = passedMessageStruct.requestType_;
messageInfo.name_ = passedMessageStruct.name_;
for (const auto &entry : passedMessageStruct.testLogType_)
{
messageInfo.testLogType_[entry.first] = entry.second;
}
messageInfo.statusType_ = passedMessageStruct.statusType_;
}
void TcpConnectionHandler::StartReading()
{
serverSocket_.async_read_some(
boost::asio::buffer(data, max_length),
boost::bind(&TcpConnectionHandler::ReadDone,
shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
void TcpConnectionHandler::StartWriting()
{
string serializedData = serializeMessageStruct(getMessageStruct());
serverSocket_.async_write_some(
boost::asio::buffer(serializedData, max_length),
boost::bind(&TcpConnectionHandler::WriteDone,
shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
void TcpConnectionHandler::ReadDone(const boost::system::error_code &err, size_t bytes_transferred)
{
if (!err)
{
cout << "server received data is '" << data << "'\n";
StartWriting();
}
else
{
SockDiscNotifierServer();
cerr << "Read error: " << err.message() << endl;
}
}
void TcpConnectionHandler::WriteDone(const boost::system::error_code &err, size_t bytes_transferred)
{
if (!err)
{
cout << "Server sent message!" << endl;
serverSocket_.is_open()?cout<<"Server socket is still connected\n":cout<<"server socket got disconnected\n";
}
else
{
SockDiscNotifierServer();
cerr << "Write error: " << err.message() << endl;
}
}
void TcpConnectionHandler::SockDiscNotifierServer()
{
cout<<"server notifier handler for socket disconnect\n";
}
/********************************************************************************************************************************************************************/
TcpServer::TcpServer(boost::asio::io_context &io_context, short port_No) : io_context_local(io_context), acceptor_(io_context,
tcp::endpoint(tcp::v4(), port_No))
{
start_accept();
}
void TcpServer::start_accept()
{
TcpConnectionHandler::pointer connection = TcpConnectionHandler::create(getioContext());
acceptor_.async_accept(connection->socket(), [this,connection](boost::system::error_code ec) {
std::cout << "async_accept -> " << ec.message() << "\n";
if (!ec) {
connection->StartReading();
start_accept();
}
});
}
test_client.cpp 'its client process main function'
//test_client.cpp
#include <iostream>
#include "tcpClient.h"
// Client side
int main(int argc, char *argv[])
{
// we need 2 things: ip address and port number, in that order
if (argc != 3)
{
cerr << "Usage: ip_address port" << endl;
exit(0);
}
char *serverIp = argv[1];
char *port = argv[2];
boost::asio::io_context io_context;
TcpClient tcpcli(io_context,serverIp, port);
std::thread io_thread([&io_context]() {
io_context.run();
});
tcpcli.StartWriting();
sleep(1);
tcpcli.StartReading();
io_thread.join();
return 0;
}
This is tcpClient.h
//tcpClient.h
#ifndef TCP_CLIENT_H
#define TCP_CLIENT_H
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <cstdlib>
#include <iostream>
#include <string>
#include "AppPlugin.h"
/**
* @class TcpClient tcpClient.h "client/tcpClient.h"
* @brief
*/
class TcpClient
{
public:
TcpClient(boost::asio::io_context& io_context, string serverIp, string port);
void readData(const boost::system::error_code&, size_t);
void sendData(const boost::system::error_code&, size_t);
enum
{
max_length = 4096
};
void SockDiscNotifierClient();
boost::asio::ip::tcp::socket socketClient_;
void StartReading();
void StartWriting();
private:
string serverIp_, port_;
bool connectStatus;
boost::asio::streambuf recvBuffer_;
boost::asio::io_context& io_context_local;
boost::asio::io_context& getioContext()
{
return io_context_local;
}
//TestManagerComInfo DeserializeMessageStruct(boost::asio::streambuf&);
};
#endif // TCP_CLIENT_H
This is tcpClient.cpp In this code I have implemented 3 startReading() and startWriting() functions. The first is implemented by me and working second I have reffered from google and boost documentation 3rd I have taken reference from chatgpt.
//tcpClient.cpp
#include "tcpClient.h"
TcpClient::TcpClient(boost::asio::io_context& io_context, string serverIp, string port)
: serverIp_(serverIp), port_(port), socketClient_(io_context), io_context_local(io_context)
{
connectStatus = false;
if(!connectStatus)
{
socketClient_.connect(tcp::endpoint(
boost::asio::ip::address::from_string(serverIp_), std::stoi(port_)));
connectStatus = true;
}
std::cout << "Client Started: \n";
}
//This StartReading is working
#if 1 //first startreading
void TcpClient::StartReading()
{
// getting response from server
cout<<"----->TcpClient::StartReading\n";
boost::system::error_code error;
//boost::asio::streambuf receiveBuff;
boost::asio::read(socketClient_, recvBuffer_, boost::asio::transfer_all(), error);
if (error && error != boost::asio::error::eof)
{
SockDiscNotifierClient();
cout << "receive failed: " << error.message() << endl;
}
else
{
DeserializeMessageStruct(recvBuffer_);
}
}
#endif
//This StartReading is not working
#if 0 //second startreading
void TcpClient::StartReading()
{
cout << "----->TCPClient::StartReading\n";
boost::asio::async_read(socketClient_, recvBuffer_, boost::asio::transfer_all(),
[this](const boost::system::error_code &error, std::size_t bytes_transferred)
{
cout<<"in TCPClient::StartReading labda\n";
// getioContext().run();
if (!error)
{
std::string receivedData(boost::asio::buffer_cast<const char*>(recvBuffer_.data()), boost::asio::buffer_size(recvBuffer_.data()));
cout<<"deserializing client received\n"<<receivedData;
recvBuffer_.consume(bytes_transferred);
StartReading();
//DeserializeMessageStruct(recvBuffer_);
}
else if (error == boost::asio::error::eof)
{
std::cerr << "Server closed the connection." << std::endl;
socketClient_.close(); // Close the socket
}
else
{
std::cerr << "Client: Error reading data: " << error.message() << std::endl;
}
});
}
#endif
//This is suggested by chatgpt and its also not working
#if 0 //third startreading
void TcpClient::StartReading() {
// Create a lambda function that encapsulates the StartReading logic
boost::asio::io_context& io_context = static_cast<boost::asio::io_context&>(socketClient_.get_executor().context());
auto readOperation = [this,&io_context]() {
cout << "----->TCPClient::StartReading\n";
socketClient_.async_receive(
boost::asio::buffer(recvBuffer_.prepare(max_length)),
[this](const boost::system::error_code &err, size_t bytes_transferred)
{
if (!err)
{
//recvBuffer_.commit(bytes_transferred); // Commit the received data
DeserializeMessageStruct(recvBuffer_);
}
else
{
SockDiscNotifierClient();
cerr << "Read error: " << err.message() << endl;
}
});
io_context.run();
};
std::thread readingThread(readOperation);
readingThread.join();
}
#endif
#if 1 //First StartWriting
void TcpClient::StartWriting()
{
// request/message from client
const string msg = "Hello Server Please send me the strucure!\n";
cout<<"----->TcpClient::StartWritin\n";
socketClient_.async_write_some(
boost::asio::buffer(msg, max_length),
boost::bind(&TcpClient::sendData, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
#endif
#if 0 //Second StartWriting
void TcpClient::StartWriting()
{
// Request/message from client
const string msg = "Hello Server Please send me the structure!\n";
cout << "----->TcpClient::StartWriting\n";
// Start an asynchronous write operation
socketClient_.async_write_some(
boost::asio::buffer(msg, max_length),
[this](const boost::system::error_code &error, std::size_t bytes_transferred)
{
if (!error)
{
cout << "Client Successfully Sent Message!\n";
// Start reading after writing is complete
StartReading();
}
else
{
std::cerr << "Send error: " << error.message() << std::endl;
}
});
}
#endif
#if 0 //Third StartWriting
void TcpClient::StartWriting() {
// Request/message from client
boost::asio::io_context& io_context = static_cast<boost::asio::io_context&>(socketClient_.get_executor().context());
const string msg = "Hello Server Please send me the structure!\n";
cout << "----->TcpClient::StartWriting\n";
// Create a lambda function that encapsulates the StartWriting logic
auto writeOperation = [this, &io_context, msg]() {
socketClient_.async_write_some(
boost::asio::buffer(msg, max_length),
[this](const boost::system::error_code &error, std::size_t bytes_transferred) {
if (!error) {
cout << "Client Successfully Sent Message!\n";
// Start reading after writing is complete
StartReading();
} else {
std::cerr << "Send error: " << error.message() << std::endl;
}
});
io_context.run();
};
// Create a thread and execute the lambda function
std::thread writingThread(writeOperation);
// Wait for the thread to finish
writingThread.join();
}
#endif
void TcpClient::readData(const boost::system::error_code &err,
size_t bytes_transferred)
{
if (!err)
{
cout << "Client Successfuly Read Message! ";
}
else
{
SockDiscNotifierClient();
cerr << "Read error: " << err.message() << endl;
}
}
void TcpClient::sendData(const boost::system::error_code &err,
size_t bytes_transferred)
{
if (!err)
{
cout << "Client Successfuly Sent Message!\n";
}
else
{
SockDiscNotifierClient();
cerr << "Send error: " << err.message() << endl;
}
}
void TcpClient::SockDiscNotifierClient()
{
cout<<"client notifier handler for socket disconnect";
}
This is AppPlugin.h, which contains serialize and deserialize function also the defination of structure
//AppPlugin.h
#ifndef APPPLUGIN_H
#define APPPLUGIN_H
#include <bits/stdc++.h>
#include <boost/serialization/map.hpp>
#include <boost/serialization/string.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/archive/text_iarchive.hpp>
using boost::asio::ip::tcp;
using std::cerr;
using std::cin;
using std::cout;
using std::endl;
using std::string;
enum RequestType
{
kSatelliteStatus = 0x00,
kApplicationPresence = 0x01,
kExecuteTestSuite = 0x02,
kTestSuiteLog = 0x03,
kTerminate = 0x04
};
enum StatusType
{
kInit = 0x00,
kStarted = 0x01,
kCompleted = 0x02
};
enum TestResultType
{
kTestNotExecute,
kTestPass,
kTestFail,
kTestAbort
};
using TestLogType = std::map<std::string, TestResultType>;
#pragma pack(push, 1) // Pack the structure
struct TestManagerComInfo
{
RequestType requestType_;
std::string name_;
TestLogType testLogType_;
StatusType statusType_;
template <typename Archive>
void serialize(Archive& ar, const unsigned int version)
{
ar& requestType_;
ar& name_;
ar& testLogType_;
ar& statusType_;
}
};
#pragma pack(pop) // Restore default packing
/**
* @fn serializeMessageStruct
* @brief Its for serialize the TestManagerComInfo struct and convert into the string
* @param messageInfoNeedtoSerialize is the argunment which is TestManagerComInfo's object
*/
inline string serializeMessageStruct(TestManagerComInfo &messageInfoNeedtoSerialize)
{
std::ostringstream archiveStream;
boost::archive::text_oarchive archive(archiveStream);
archive << messageInfoNeedtoSerialize;
return archiveStream.str();
}
inline TestManagerComInfo DeserializeMessageStruct(boost::asio::streambuf& deserializeBuff)
{
TestManagerComInfo receivedMsgStruct;
std::istream archiveStream(&deserializeBuff);
boost::archive::text_iarchive archive(archiveStream);
archive >> receivedMsgStruct;
cout << "Received Request Type: "
<< static_cast<int>(receivedMsgStruct.requestType_) << std::endl;
cout << "Received Name: " << receivedMsgStruct.name_ << std::endl;
cout << "Received Test Log:" << endl;
for (const auto &test : receivedMsgStruct.testLogType_)
{
cout << " " << test.first << ": " << test.second << endl;
}
std::cout << "Received Status Type: "
<< static_cast<int>(receivedMsgStruct.statusType_)
<< std::endl;
return receivedMsgStruct;
}
#endif // APPPLUGIN_H
Upvotes: 1
Views: 306
Reputation: 392833
A first question that rises is: why do you want to make the operation async.
A second question is: what implementation do you wish to land on. I'm going to assume implementation 2 - the one that you hand-wrote.
Regardless of everything, the problem is the same:
TcpClient tcpcli(io_context, serverIp, port);
std::thread io_thread([&io_context]() { io_context.run(); });
tcpcli.StartWriting();
sleep(1);
The io_thread
runs out of work. Whether it does so before StartWriting()
is invoked is a race condition - it will depend on the weather conditions. However, the sleep(1)
simply guarantees that io_thread
is done¹.
You need a work guard of sorts to prevent the thread from exiting prematurely. My favorite approach is to use asio::thread_pool
which does this internally:
boost::asio::thread_pool io_context(1); // single thread
TcpClient tcpcli(io_context.get_executor(), serverIp, port);
tcpcli.StartWriting();
io_context.join();
This requires you to modify your code to accept executors instead of hard-coding io_context&
.
Glad you asked. The TcpServer
is different, because it kicks of an infinite chain of async operations from inside its constructor, so there is always work:
TcpServer::TcpServer(boost::asio::io_context& io_context, std::uint16_t port_No)
: io_context_local(io_context)
, acceptor_(io_context, tcp::endpoint(tcp::v4(), port_No)) {
start_accept();
}
Your code has some issues.
io_context&
references throughout is unnecessarily limiting.DeserializeMessageStruct
not only doesn't belong in the header,So we get `AppPlugin.cpp`:
#include "AppPlugin.h"
#include <ostream>
#include <sstream>
std::ostream& operator<<(std::ostream& os, TestManagerComInfo const& msg) {
os << "Request Type: " << static_cast<int>(msg.requestType_) << std::endl;
os << "Name: " << msg.name_ << std::endl;
os << "Test Log:" << std::endl;
for (auto const& [k, v] : msg.testLogType_) {
os << " " << k << ": " << v << std::endl;
}
os << "Status Type: " << static_cast<int>(msg.statusType_) << std::endl;
return os;
}
std::string serializeMessageStruct(TestManagerComInfo& msg) {
std::ostringstream oss;
{
boost::archive::text_oarchive archive(oss);
archive << msg;
} // SEHE: completes the archive
return oss.str();
}
TestManagerComInfo DeserializeMessageStruct(std::streambuf& deserializeBuff) {
TestManagerComInfo msg;
std::istream archiveStream(&deserializeBuff);
boost::archive::text_iarchive(archiveStream) >> msg;
return msg;
}
And then on usage, you write:
TestManagerComInfo msg = DeserializeMessageStruct(recvBuffer_);
std::cout << "RECEIVED: " << msg;
You clearly didn't enable compiler warnings, or you would notice the type mismatches on ports and e.g. wrong member initialization orders, unused variables etc.
Note that #pragma pack
has no effect if you're serializing using Boost
Serialization as you do. In fact, it's unportable by definition and can
only be a performance pessimization as your struct isn't trivial,
standard-layout or POD anyways.
The dance with buffers is very inefficient:
std::string receivedData(asio::buffer_cast<const char*>(recvBuffer_.data()),
asio::buffer_size(recvBuffer_.data()));
std::cout << "deserializing client received\n" << receivedData;
TestManagerComInfo msg = DeserializeMessageStruct(recvBuffer_);
recvBuffer_.consume(bytes_transferred);
This could "just" be
std::cout << "RECEIVED: " << DeserializeMessageStruct(recvBuffer_);
You also redundantly invoke StartReading
from main, where it already happens from the completion handler in StartWriting
.
The message "Server socket is still connected" is misleading, as is_open
doesn't imply that the peer hasn't shutdown the connection.
tranfer_all()
is the default completion condition for async_read
.
Your async_write_some
needs to be asio::async_write
to ensure the full buffer gets sent. You are passing a local variable as the buffer, which is UB because it doesn't live till completion of the async operation. Either make it static or e.g. a member variable.
Applying the unrelated max_length
to it is ... useless at best.
SockDiscNotifierClient
is misleading, as it's not always called in response to disconnection. That said, sendData
/readData
seem unnecessary now, so I'll just remove them.
There's a bunch of inconsistent using namespace
and std::endl
vs '\n'
(use std::endl
if you expect a flush).
Hmm. Now I'm noticing the server also uses async_read_some
and async_write_some
instead of composed operations that avoid partial messages. I'll leave that as an exercise for the reader for now.
PS: later I changed the interface to take an
istream
instead ofstreambuf
¹ (unless somehow writing took longer than a second, which seems improbable)
File tcpServer.h
// tcpServer.h
#ifndef TCP_SERVER_H
#define TCP_SERVER_H
#include "AppPlugin.h"
#include <boost/asio.hpp>
#include <string>
/**
* @class TcpServer tcpServer.h "server/tcpServer.h"
* @brief
*/
class TcpServer {
private:
boost::asio::any_io_executor io_context_local;
boost::asio::ip::tcp::acceptor acceptor_;
boost::asio::any_io_executor getioContext() { return io_context_local; }
void start_accept();
public:
TcpServer(boost::asio::any_io_executor, uint16_t);
};
#endif // TCP_SERVER_H
File tcpServer.cpp
// tcpServer.cpp
#include "tcpServer.h"
#include <boost/bind/bind.hpp>
#include <iomanip>
#include <iostream>
#include <memory>
namespace asio = boost::asio;
using asio::ip::tcp;
using boost::system::error_code;
/**
* @class TcpConnectionHandler tcpServer.h "server/tcpServer.h"
* @brief
*/
class TcpConnectionHandler : public std::enable_shared_from_this<TcpConnectionHandler> {
private:
tcp::socket serverSocket_;
TcpConnectionHandler(asio::any_io_executor);
TestManagerComInfo messageInfo;
enum { max_length = 4096 };
char data[max_length];
std::string message = "ok client, I am sending the structure!";
public:
using pointer = std::shared_ptr<TcpConnectionHandler>;
// creating the pointer
static inline pointer create(asio::any_io_executor io_context) {
return pointer(new TcpConnectionHandler(io_context));
}
// socket creation
tcp::socket& socket() { return serverSocket_; }
void StartReading();
void StartWriting();
void WriteDone(error_code, size_t);
void ReadDone(error_code, size_t);
void setMessageStruct(TestManagerComInfo const&);
void SockDiscNotifierServer();
TestManagerComInfo const& getMessageStruct() const { return messageInfo; }
TestManagerComInfo& getMessageStruct() { return messageInfo; }
};
TcpConnectionHandler::TcpConnectionHandler(asio::any_io_executor io_context) : serverSocket_(io_context) {
messageInfo.requestType_ = kTerminate;
messageInfo.name_ = "COM_IPC_GROUP_001";
messageInfo.testLogType_ = {{"Test1", kTestPass}, {"Test112", kTestFail}, {"Test3", kTestAbort}};
messageInfo.statusType_ = kCompleted;
}
void TcpConnectionHandler::setMessageStruct(TestManagerComInfo const& passedMessageStruct) {
#if 0
messageInfo.requestType_ = kTerminate;
messageInfo.name_ = "COM_IPC_GROUP_001";
messageInfo.testLogType_ = {
{"Test1", kTestPass},
{"Test2", kTestFail},
{"Test3", kTestAbort}
};
messageInfo.statusType_ = kCompleted;
#endif
messageInfo.requestType_ = passedMessageStruct.requestType_;
messageInfo.name_ = passedMessageStruct.name_;
for (auto const& entry : passedMessageStruct.testLogType_) {
messageInfo.testLogType_[entry.first] = entry.second;
}
messageInfo.statusType_ = passedMessageStruct.statusType_;
}
void TcpConnectionHandler::StartReading() {
serverSocket_.async_read_some(asio::buffer(data, max_length),
boost::bind(&TcpConnectionHandler::ReadDone, shared_from_this(),
asio::placeholders::error,
asio::placeholders::bytes_transferred));
}
void TcpConnectionHandler::StartWriting() {
std::string serializedData = serializeMessageStruct(getMessageStruct());
serverSocket_.async_write_some(asio::buffer(serializedData, max_length),
boost::bind(&TcpConnectionHandler::WriteDone, shared_from_this(),
asio::placeholders::error,
asio::placeholders::bytes_transferred));
}
void TcpConnectionHandler::ReadDone(error_code err, size_t bytes_transferred) {
if (!err) {
std::cout << "server received data is "
<< std::quoted(std::string_view(data, bytes_transferred), '\'') << std::endl;
StartWriting();
} else {
SockDiscNotifierServer();
std::cerr << "Read error: " << err.message() << std::endl;
}
}
void TcpConnectionHandler::WriteDone(error_code err, size_t /*bytes_transferred*/) {
if (!err) {
std::cout << "Server sent message!" << std::endl;
} else {
SockDiscNotifierServer();
std::cerr << "Write error: " << err.message() << std::endl;
}
}
void TcpConnectionHandler::SockDiscNotifierServer() {
std::cout << "server notifier handler for socket disconnect" << std::endl;
}
/********************************************************************************************************************************************************************/
TcpServer::TcpServer(asio::any_io_executor io_context, uint16_t port)
: io_context_local(io_context)
, acceptor_(io_context, tcp::endpoint(tcp::v4(), port)) {
start_accept();
}
void TcpServer::start_accept() {
TcpConnectionHandler::pointer connection = TcpConnectionHandler::create(getioContext());
acceptor_.async_accept(connection->socket(), [this, connection](error_code ec) {
std::cout << "async_accept -> " << ec.message() << std::endl;
if (!ec) {
connection->StartReading();
start_accept();
}
});
}
File tcpClient.h
// tcpClient.h
#ifndef TCP_CLIENT_H
#define TCP_CLIENT_H
#include <boost/asio.hpp>
/**
* @class TcpClient tcpClient.h "client/tcpClient.h"
* @brief
*/
class TcpClient {
public:
TcpClient(boost::asio::any_io_executor io_context, std::string serverIp, std::string port);
void StartWriting();
private:
void StartReading();
boost::asio::ip::tcp::socket socketClient_;
std::string serverIp_, port_;
bool connectStatus;
boost::asio::streambuf recvBuffer_;
};
#endif // TCP_CLIENT_H
File tcpClient.cpp
// tcpClient.cpp
#include "tcpClient.h"
#include "AppPlugin.h"
#include <iostream>
namespace asio = boost::asio;
using asio::ip::tcp;
using boost::system::error_code;
TcpClient::TcpClient(asio::any_io_executor ex, std::string serverIp, std::string port)
: socketClient_(ex)
, serverIp_(serverIp)
, port_(port) //
{
connectStatus = false;
if (!connectStatus) {
socketClient_.connect(tcp::endpoint(asio::ip::address::from_string(serverIp_),
static_cast<uint16_t>(std::stoi(port_))));
connectStatus = true;
}
std::cout << "Client Started: \n";
}
static const std::string s_msg = "Hello Server Please send me the structure!\n";
void TcpClient::StartReading() {
std::cout << "----->TCPClient::StartReading\n";
asio::async_read( //
socketClient_, recvBuffer_, [this](error_code error, size_t /*bytes_transferred*/) {
std::cout << "in TCPClient::StartReading completion handler (" << error.message() << ")\n";
if (!error || error == asio::error::eof) {
std::cout << "RECEIVED: " << DeserializeMessageStruct(recvBuffer_) << "--------\n";
}
});
}
void TcpClient::StartWriting() {
// Request/message from client
std::cout << "----->TcpClient::StartWriting\n";
// Start an asynchronous write operation
socketClient_.async_write_some( //
asio::buffer(s_msg), [this](error_code error, size_t /*bytes_transferred*/) {
if (!error) {
std::cout << "Client Successfully Sent Message!\n";
// Start reading after writing is complete
StartReading();
} else {
std::cerr << "Send error: " << error.message() << std::endl;
}
});
}
File test_server.cpp
// test_server.cpp
#include "tcpServer.h"
#include <iostream>
int main(int argc, char* argv[]) try {
if (argc != 2) {
std::cerr << "Missing port no: Please pass port no argument in async_tcp_server\n";
return 1;
}
boost::asio::io_context io_context;
TcpServer server(io_context.get_executor(), static_cast<uint16_t>(std::atoi(argv[1])));
io_context.run();
} catch (std::exception const& e) {
std::cerr << "Exception: " << e.what() << std::endl;
}
File test_client.cpp
// test_client.cpp
#include <iostream>
#include "tcpClient.h"
// Client side
int main(int argc, char* argv[]) try {
// we need 2 things: ip address and port number, in that order
if (argc != 3) {
std::cerr << "Usage: ip_address port" << std::endl;
std::exit(0);
}
char* serverIp = argv[1];
char* port = argv[2];
boost::asio::thread_pool ioc(1); // single thread
TcpClient tcpcli(ioc.get_executor(), serverIp, port);
tcpcli.StartWriting();
ioc.join();
} catch(std::exception const& e) {
std::cerr << "Exception: " << e.what() << std::endl;
}
File AppPlugin.cpp
#include "AppPlugin.h"
#include <ostream>
#include <sstream>
#include <boost/archive/text_iarchive.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/serialization/map.hpp>
#include <boost/serialization/string.hpp>
std::ostream& operator<<(std::ostream& os, TestManagerComInfo const& msg) {
os << "Request Type: " << static_cast<int>(msg.requestType_) << std::endl;
os << "Name: " << msg.name_ << std::endl;
os << "Test Log:" << std::endl;
for (auto const& [k, v] : msg.testLogType_) {
os << " " << k << ": " << v << std::endl;
}
os << "Status Type: " << static_cast<int>(msg.statusType_) << std::endl;
return os;
}
template <typename Ar> void serialize(Ar& ar, TestManagerComInfo& obj, unsigned /*version*/) {
ar& obj.requestType_& obj.name_& obj.testLogType_& obj.statusType_;
}
std::string serializeMessageStruct(TestManagerComInfo& msg) {
std::ostringstream oss;
{
boost::archive::text_oarchive archive(oss);
archive << msg;
} // SEHE: completes the archive
return oss.str();
}
TestManagerComInfo DeserializeMessageStruct(std::streambuf& sb) {
std::istream is(&sb);
TestManagerComInfo msg;
boost::archive::text_iarchive(is) >> msg;
return msg;
}
File AppPlugin.h
// AppPlugin.h
#ifndef APPPLUGIN_H
#define APPPLUGIN_H
#include <map>
#include <string>
enum RequestType {
kSatelliteStatus = 0x00,
kApplicationPresence = 0x01,
kExecuteTestSuite = 0x02,
kTestSuiteLog = 0x03,
kTerminate = 0x04,
};
enum StatusType {
kInit = 0x00,
kStarted = 0x01,
kCompleted = 0x02,
};
enum TestResultType {
kTestNotExecute,
kTestPass,
kTestFail,
kTestAbort,
};
using TestLogType = std::map<std::string, TestResultType>;
struct TestManagerComInfo {
RequestType requestType_;
std::string name_;
TestLogType testLogType_;
StatusType statusType_;
friend std::ostream& operator<<(std::ostream&, TestManagerComInfo const&);
};
/**
* @fn serializeMessageStruct
* @brief serialize TestManagerComInfo into string
* @param msg is the object to serialize
*/
std::string serializeMessageStruct(TestManagerComInfo& msg);
TestManagerComInfo DeserializeMessageStruct(std::streambuf& sb);
#endif // APPPLUGIN_H
Local demo:
Upvotes: 0