Reputation: 39
I make an asynchronous chat server in C++ using the boost library. Almost everything works fine.
There are two ways for a client to disconnect:
The former is OK. However, the latter has a problem. If a client disconnects with "exit", the next message, sent by another client, appears without the first several characters. After that it's OK.
For example: Several clients chat. One of them disconnects with "exit". After that, another client sends "0123456789abcdefghijk" and all clients receive only: "abcdefghijk". I don't know where's the problem, I guess it's something about streambuf. I found similar problem (almost the same) but in C#.
Here's the code:
#include<iostream>
#include<list>
#include<map>
#include<queue>
#include<vector>
#include<cstdlib>
#include<ctime>
#include<boost/thread.hpp>
#include<boost/bind.hpp>
#include<boost/asio.hpp>
#include<boost/asio/ip/tcp.hpp>
using namespace std;
using namespace boost::asio;
using namespace boost::asio::ip;
typedef boost::shared_ptr<tcp::socket> socket_ptr;
typedef boost::shared_ptr<string> string_ptr;
typedef boost::shared_ptr< list<socket_ptr> > clientList_ptr;
typedef boost::shared_ptr< list<string> > nameList_ptr;
const string waitingMsg("Waiting for clients...\n");
const string totalClientsMsg("Total clients: ");
const string errorReadingMsg("Error on reading: ");
const string errorWritingMsg("Error on writing: ");
const int EOF_ERROR_CODE = 2;
const int THREADS = 1;
io_service service;
tcp::acceptor acceptor(service, tcp::endpoint(tcp::v4(), 30001));
boost::mutex mtx;
clientList_ptr clientList(new list<socket_ptr>);
nameList_ptr nameList(new list<string>);
boost::asio::streambuf buff;
time_t timer;
void ready();
void accepting();
void askForName(socket_ptr clientSock, const boost::system::error_code& error);
void receiveName(socket_ptr clientSock, const boost::system::error_code& error,
std::size_t bytes_transferred);
void identify(socket_ptr clientSock, const boost::system::error_code& error, std::size_t bytes_transferred);
void accepted(socket_ptr clientSock, string_ptr name);
void receiveMessage(socket_ptr clientSock, string_ptr name);
void received(socket_ptr clientSock, string_ptr name, const boost::system::error_code& error,
std::size_t bytes_transferred);
bool extract(string_ptr message, std::size_t bytes_transferred);
bool clientSentExit(string_ptr clientSock);
void disconnectClient(socket_ptr clientSock, string_ptr name, const boost::system::error_code& error);
void writeMessage(socket_ptr clientSock, string_ptr message);
void responseSent(const boost::system::error_code& error);
void notification(socket_ptr sock, string_ptr name, const string headOfMsg, const string tailOfMsg);
int main(int argc, char* argv[])
{
try
{
vector<boost::shared_ptr<boost::thread> > threads;
ready();
for (int i = 0; i < THREADS; i++)
{
boost::shared_ptr <boost::thread> t(new boost::thread(boost::bind(&io_service::run, &service)));
threads.push_back(t);
}
for (int i = 0; i < THREADS; i++)
{
threads[i]->join();
}
}
catch (std::exception& error)
{
cerr << error.what() << endl;
}
return 0;
}
void ready()
{
cout << waitingMsg;
accepting();
}
void accepting()
{
socket_ptr clientSock(new tcp::socket(service));
acceptor.async_accept(*clientSock, boost::bind(&askForName, clientSock, boost::asio::placeholders::error));
}
void askForName(socket_ptr sock, const boost::system::error_code& error)
{
if (error)
{
cerr << "Error on accepting: " << error.message() << endl;
}
boost::asio::async_write(*sock, buffer("Please, enter your name:\n"),
boost::bind(&receiveName, sock, boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
accepting();
}
void receiveName(socket_ptr sock, const boost::system::error_code& error,
std::size_t bytes_transferred)
{
if (error)
{
cerr << errorWritingMsg << error.message() << endl;
}
boost::asio::async_read_until(*sock, buff, '\n',
boost::bind(&identify, sock, boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
void identify(socket_ptr sock, const boost::system::error_code& error,
std::size_t bytes_transferred)
{
if(error)
{
if (error.value() != EOF_ERROR_CODE)
{
cerr << errorReadingMsg << error.message() << endl;
}
return;
}
string_ptr name(new string(""));
if (!extract(name, bytes_transferred))
{
return;
}
if (find(nameList->begin(), nameList->end(), *name) != nameList->end())
{
boost::asio::async_write(*sock, buffer("This name is already in use! Please, select another name:\n"),
boost::bind(&receiveName, sock, boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
return;
}
nameList->emplace_back(*name);
accepted(sock, name);
}
void accepted(socket_ptr sock, string_ptr name)
{
mtx.lock();
clientList->emplace_back(sock);
mtx.unlock();
notification(sock, name, "New client: ", " joined. ");
receiveMessage(sock, name);
}
void receiveMessage(socket_ptr sock, string_ptr name)
{
boost::asio::async_read_until(*sock, buff, '\n', boost::bind(&received, sock, name, boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
void received(socket_ptr sock, string_ptr name, const boost::system::error_code& error,
std::size_t bytes_transferred)
{
if(error)
{
if (error.value() != EOF_ERROR_CODE)
{
cerr << errorReadingMsg << error.message() << endl;
}
disconnectClient(sock, name, error);
return;
}
if(!clientList->empty())
{
//mtx.lock();
string_ptr message(new string(""));
if(!extract(message, bytes_transferred))
{
//mtx.unlock();
disconnectClient(sock, name, error);
return;
}
*message = *name + ": " + *message + "\n";
cout << "ChatLog: " << *message << endl;
writeMessage(sock, message);
receiveMessage(sock, name);
//mtx.unlock();
}
}
bool extract(string_ptr message, std::size_t bytes_transferred)
{
mtx.lock();
buff.commit(bytes_transferred);
std::istream istrm(&buff);
//mtx.unlock();
std::getline(istrm, *message);
buff.consume(buff.size());
string_ptr msgEndl(new string(*message + "\n"));
mtx.unlock();
if(clientSentExit(msgEndl))
{
return false;
}
return true;
}
bool clientSentExit(string_ptr message)
{
return message->compare(0, 5, "exit\n") == 0;
}
void disconnectClient(socket_ptr sock, string_ptr name, const boost::system::error_code& error)
{
boost::system::error_code ec = error;
auto position = find(clientList->begin(), clientList->end(), sock);
auto namePos = find(nameList->begin(), nameList->end(), *name);
sock->shutdown(tcp::socket::shutdown_both, ec);
if (ec)
{
cerr << "Error on shutting: " << ec.message() << endl;
}
sock->close(ec);
if(ec)
{
cerr << "Error on closing: " << ec.message() << endl;
}
clientList->erase(position);
nameList->erase(namePos);
notification(sock, name, "", " disconnected. ");
}
void writeMessage(socket_ptr sock, string_ptr message)
{
for(auto& cliSock : *clientList)
{
if (cliSock->is_open() && cliSock != sock)
{
boost::asio::async_write(*cliSock, buffer(*message),
boost::bind(&responseSent, boost::asio::placeholders::error));
}
}
}
void responseSent(const boost::system::error_code& error)
{
if (error)
{
cerr << "Error on writing: " << error.message() << endl;
}
}
void notification(socket_ptr sock, string_ptr name, const string headOfMsg, const string tailOfMsg)
{
string_ptr serviceMsg (new string (headOfMsg + *name + tailOfMsg));
cout << *serviceMsg << totalClientsMsg << clientList->size() << endl;
*serviceMsg = *serviceMsg + "\n";
writeMessage(sock, serviceMsg);
cout << waitingMsg;
}
It's interesting that I have similar synchronous server with the same way of using of streambuf, but there are no such problems.
Upvotes: 2
Views: 493
Reputation: 39
I think I fixed the issue. Just created a list of streambufs - a streambuf for each client. But I had to keep the consume() function, because otherwise the check if a given name already exists failed resulting in possibility to have several clients sharing the same name. Then messaging stopped working but I removed the locks in extract() and now everything appears to be all right.
Upvotes: 0
Reputation: 10155
boost::asio::async_read_until() can read any amount of characters to streambuf after \n. It then gives you bytes_transferred, which is count of characters in the first line (not necessarily the count of characters that were read to the buffer). See documentation.
As long as you keep your buffer variable intact, next boost::asio::async_read_until() will read characters first from the buffer and then from the socket.
It seems to me that you read a line from the buffer using getline(), which is correct. After that, you call
buff.consume(buff.size());
which clears the buffer, removing all information about the partial lines you may have received. The first complete line that you read has already been removed from the buffer by getline(), so the consume() call is unnecessary in any case.
Just removing the consume() call would not solve your problem, because you seem to use one buffer that is shared between all clients, and you would not know what partial data was from which client. A possible solution could be creating a list of buffers (one for each client), just like you have a list of sockets. Then boost::asio::async_read_until() and getline() would take care of handling the partial data, and you wouldn't have to think about that.
Upvotes: 2
Reputation: 392893
The other answer explains what went wrong.
However it can be a bit tricky to find out how to fix it.
Maybe you can find inspiration from a similar question I handled here:
Here, the OP was having trouble with basically the same thing: after reading his HTTP headers he would have "dropped" part of the body. So I added logic:
NOTE Again, it's important not to assume that the end-of-headers coincides with the packet boundary. Therefore, start
read_body()
with draining the available input already received.
std::shared_ptr<std::vector<char> > bufptr = std::make_shared<std::vector<char> >(nbuffer);
auto partial = std::copy(
std::istreambuf_iterator<char>(&pThis->buff), {},
bufptr->begin());
std::size_t already_received = std::distance(bufptr->begin(), partial);
assert(nbuffer >= already_received);
nbuffer -= already_received;
Upvotes: 2