Reputation: 797
I've just started working with boost. I'm writting TCP client-server with async sockets.
The task is the following:
Now works the following
How can allow client to input numbers from keyboard and to wait an answer from the server at the same time?
And why does my client not wait for the answer from sever?
The client code:
using boost::asio::ip::tcp;
class TCPClient
{
public:
TCPClient(boost::asio::io_service& IO_Service, tcp::resolver::iterator EndPointIter);
void Close();
private:
boost::asio::io_service& m_IOService;
tcp::socket m_Socket;
string m_SendBuffer;
static const size_t m_BufLen = 100;
char m_RecieveBuffer[m_BufLen*2];
void OnConnect(const boost::system::error_code& ErrorCode, tcp::resolver::iterator EndPointIter);
void OnReceive(const boost::system::error_code& ErrorCode);
void OnSend(const boost::system::error_code& ErrorCode);
void DoClose();
};
TCPClient::TCPClient(boost::asio::io_service& IO_Service, tcp::resolver::iterator EndPointIter)
: m_IOService(IO_Service), m_Socket(IO_Service), m_SendBuffer("")
{
tcp::endpoint EndPoint = *EndPointIter;
m_Socket.async_connect(EndPoint,
boost::bind(&TCPClient::OnConnect, this, boost::asio::placeholders::error, ++EndPointIter));
}
void TCPClient::Close()
{
m_IOService.post(
boost::bind(&TCPClient::DoClose, this));
}
void TCPClient::OnConnect(const boost::system::error_code& ErrorCode, tcp::resolver::iterator EndPointIter)
{
cout << "OnConnect..." << endl;
if (ErrorCode == 0)
{
cin >> m_SendBuffer;
cout << "Entered: " << m_SendBuffer << endl;
m_SendBuffer += "\0";
m_Socket.async_send(boost::asio::buffer(m_SendBuffer.c_str(),m_SendBuffer.length()+1),
boost::bind(&TCPClient::OnSend, this,
boost::asio::placeholders::error));
}
else if (EndPointIter != tcp::resolver::iterator())
{
m_Socket.close();
tcp::endpoint EndPoint = *EndPointIter;
m_Socket.async_connect(EndPoint,
boost::bind(&TCPClient::OnConnect, this, boost::asio::placeholders::error, ++EndPointIter));
}
}
void TCPClient::OnReceive(const boost::system::error_code& ErrorCode)
{
cout << "receiving..." << endl;
if (ErrorCode == 0)
{
cout << m_RecieveBuffer << endl;
m_Socket.async_receive(boost::asio::buffer(m_RecieveBuffer, m_BufLen),
boost::bind(&TCPClient::OnReceive, this, boost::asio::placeholders::error));
}
else
{
cout << "ERROR! OnReceive..." << endl;
DoClose();
}
}
void TCPClient::OnSend(const boost::system::error_code& ErrorCode)
{
cout << "sending..." << endl;
if (!ErrorCode)
{
cout << "\""<< m_SendBuffer <<"\" has been sent" << endl;
m_SendBuffer = "";
m_Socket.async_receive(boost::asio::buffer(m_RecieveBuffer, m_BufLen),
boost::bind(&TCPClient::OnReceive, this, boost::asio::placeholders::error));
}
else
{
cout << "OnSend closing" << endl;
DoClose();
}
}
void TCPClient::DoClose()
{
m_Socket.close();
}
int main()
{
try
{
cout << "Client is starting..." << endl;
boost::asio::io_service IO_Service;
tcp::resolver Resolver(IO_Service);
string port = "13";
tcp::resolver::query Query("127.0.0.1", port);
tcp::resolver::iterator EndPointIterator = Resolver.resolve(Query);
TCPClient Client(IO_Service, EndPointIterator);
cout << "Client is started!" << endl;
cout << "Enter a query string " << endl;
boost::thread ClientThread(boost::bind(&boost::asio::io_service::run, &IO_Service));
Client.Close();
ClientThread.join();
}
catch (exception& e)
{
cerr << e.what() << endl;
}
cout << "\nClosing";
getch();
}
Here is output from console
Client is starting...
Client is started!
OnConnect...
12
Entered: 12
sending...
"12" has been sent
receiving...
ERROR! OnReceive...
Closing
Server part
class Session
{
public:
Session(boost::asio::io_service& io_service)
: socket_(io_service)
{
dataRx[0] = '\0';
dataTx[0] = '\0';
}
tcp::socket& socket()
{
return socket_;
}
void start()
{
socket_.async_read_some(boost::asio::buffer(dataRx, max_length),
boost::bind(&Session::handle_read, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
void handle_read(const boost::system::error_code& error, size_t bytes_transferred)
{
cout << "reading..." << endl;
cout << "Data: " << dataRx << endl;
if (!error)
{
if (!isValidData())
{
cout << "Bad data!" << endl;
sprintf(dataTx, "Bad data!\0");
dataRx[0] = '\0';
}
else
{
sprintf(dataTx, getFactorization().c_str());
dataRx[0] = '\0';
}
boost::asio::async_write(socket_,
boost::asio::buffer(dataTx, max_length*2),
boost::bind(&Session::handle_write, this,
boost::asio::placeholders::error));
}
else
{
delete this;
}
}
void handle_write(const boost::system::error_code& error)
{
cout << "writing..." << endl;
if (!error)
{
cout << "dataTx sent: " << dataTx << endl;
dataTx[0] = '\0';
socket_.async_read_some(boost::asio::buffer(dataRx, max_length),
boost::bind(&Session::handle_read, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
else
{
delete this;
}
}
string getFactorization() const
{
//Do something
}
bool isValidData()
{
locale loc;
for (int i = 0; i < strlen(dataRx); i++)
if (!isdigit(dataRx[i],loc))
return false;
return true;
}
private:
tcp::socket socket_;
static const size_t max_length = 100;
char dataRx[max_length];
char dataTx[max_length*2];
};
class Server
{
public:
Server(boost::asio::io_service& io_service, short port)
: io_service_(io_service),
acceptor_(io_service, tcp::endpoint(tcp::v4(), port))
{
Session* new_session = new Session(io_service_);
acceptor_.async_accept(new_session->socket(),
boost::bind(&Server::handle_accept, this, new_session,
boost::asio::placeholders::error));
}
void handle_accept(Session* new_session, const boost::system::error_code& error)
{
if (!error)
{
new_session->start();
new_session = new Session(io_service_);
acceptor_.async_accept(new_session->socket(),
boost::bind(&Server::handle_accept, this, new_session,
boost::asio::placeholders::error));
}
else
{
delete new_session;
}
}
private:
boost::asio::io_service& io_service_;
tcp::acceptor acceptor_;
};
int main(int argc, char* argv[])
{
cout << "Server is runing..." << endl;
try
{
boost::asio::io_service io_service;
int port = 13;
Server s(io_service, port);
cout << "Server is run!" << endl;
io_service.run();
}
catch (boost::system::error_code& e)
{
std::cerr << e << "\n";
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}
Server's ouput
Server is runing...
Server is run!
reading...
Data: 12
writing...
dataTx sent: 13 //just send back received ++number
reading...
Data:
Your help will be very appreciated
========
Added
Ok, I understand. But check ErrorCode == boost::asio::error::eof does not works... What have I done wrong?
else if (ErrorCode == boost::asio::error::eof)
{
cout << "boost::asio::error::eof in OnReceive!" << endl;
}
else
{
cout << "ERROR! OnReceive..." << ErrorCode << endl;
DoClose();
}
The print out is ERROR! OnReceive...system:10009
it seems to be my comparison is incorrect
========
Added
I found the root cause. I've stated use async_receive
(instead of async_read_some
) and swaped the lines in main
to
ClientThread.join();
Client.Close();
Now it works fine!
Now I'm trying to read and write data from/to socket at the same time (because the client should be able to sent additional requests before answer from the server is recieved.
In OnConnect
function I create boost threads:
boost::thread addMsgThread(boost::bind(&TCPClient::addMsgLoop, this));
boost::thread receivingThread(boost::bind(&TCPClient::startReceiving, this));
boost::thread sendingThread(boost::bind(&TCPClient::startSending, this));
with inplementation
void TCPClient::startReceiving()
{
cout << "receiving..." << endl;
m_RecieveBuffer[0] = '\0';
m_Socket.async_receive(boost::asio::buffer(m_RecieveBuffer, m_BufLen),
boost::bind(&TCPClient::receivingLoop, this, boost::asio::placeholders::error)); //runtime error here
cout << "m_RecieveBuffer = " << m_RecieveBuffer << endl;
}
void TCPClient::receivingLoop(const boost::system::error_code& ErrorCode)
{
cout << "receiving..." << endl;
if (ErrorCode == 0)
{
cout << "m_RecieveBuffer = " << m_RecieveBuffer << endl;
m_Socket.async_receive(boost::asio::buffer(m_RecieveBuffer, m_BufLen),
boost::bind(&TCPClient::receivingLoop, this, boost::asio::placeholders::error));
}
else
{
cout << "ERROR! receivingLoop..." << ErrorCode << endl;
DoClose();
}
}
void TCPClient::addMsgLoop()
{
while (true)
{
string tmp;
cin >> tmp;
cout << "Entered: " << tmp << endl;
tmp += "\0";
try
{
msgQueue.push(tmp);
}
catch(exception &e)
{
cerr << "Canno add msg to send queue... " << e.what() << endl;
}
}
}
The issue is the same with both receive
and send
threads: runtime error (writing access violation somewhere in boost libraries).
void TCPClient::startReceiving()
{
...
m_Socket.async_receive(); //runtime error here
}
In sequent version all works fine (but I don't know how to implement multiple sending before answer). Can anybody tell me how to fix the issue or how implement this by another way? May be pooling can help but I'm now sure that it is good way.
Upvotes: 7
Views: 20029
Reputation: 8948
boost::asio::ip::tcp::socket::async_read_some as the name suggests is not guaranteed to read complete data. It sets error
object to boost::asio::error::eof
when client is finished writing.
The error you are getting is because of this:
server part
if (!error)
{
...
}
else
{
delete this;
}
In else
block, you are assuming that this is a error case and closing the connection. This is not always the case. Before else
you need to check for error == boost::asio::error::eof
.
Apart from this in read handler, you should keep collecting whatever is read in a buffer till you hit error == boost::asio::error::eof
. Only then you should validate read data and write back to client.
Take a look at HTTP server 1, 2, 3 implementation in examples section.
Update: Answer to updated question
You have thread synchronization issue with the updated code.
msgQueue
is simultaneously accessed from two or more threads without any lock.If I understood your problem correctly, you want to:
You can use two boost::asio::io_service::strands for the two tasks. When using Asio, strands are the way to synchronize your tasks. Asio makes sure that tasks posted in a strand are executed synchronously.
In strand1
post a send
task that looks like: read_user_input -> send_to_server -> handle_send -> read_user_input
In strand2
post a read
task that looks like: read_some -> handle_read -> read_some
This will make sure msgQueue
is not accessed simultaneously from two threads. Use two sockets for read and write to server, to make sure simultaneous read and write is not called on the same socket.
Upvotes: 4