Reputation: 360
I need to handle incoming tcp connections on individual QThread
's.
After successful client authentication, the according socket should be stored in an QList
object.
[simplified main/server-side application]
class Server : public QObject
{
Q_OBJECT
public:
Server();
private:
QList<QTcpSocket*> m_connections;
QTcpServer m_server;
void handleIncomingConnection();
void handleWaiterThread();
private slots:
void treatFinishedWaiterThread();
}
[according function definitions]
handleIncomingConnection()
slot is connected with the server object's (m_server) newConnection()
signal.
void Server::handleIncomingConnection()
{
QThread *waiter = new QThread();
connect(waiter, SIGNAL(started()), this, SLOT(handleWaiterThread()));
connect(waiter, SIGNAL(finished()), this, SLOT(treatFinishedWaiterThread()));
moveToThread(waiter);
waiter->start();
}
void Server::handleWaiterThread()
{
// fetch requesting socket
QTcpSocket *socket = m_server->nextPendingConnection();
// HANDLE PASSWORD AUTHENTICATION HERE ...
// IF SUCCESSFUL, CONTINUE
connect(socket, SIGNAL(disconnected()), this, SLOT(clientDisconnected()));
// add to list
m_connections.append(socket);
}
void Server::treatFinishedWaiterThread()
{
QThread *caller = qobject_cast<QThread*>(sender());
caller->deleteLater();
}
If I try to run this, the threads get created but no SIGNAL is emitted when they're done, so I can't delete threads afterwards. Additionally I get this message:
QObject::moveToThread: Widgets cannot be moved to a new thread
How to fix this?
According to QTcpServer::nextPendingConnection() it says:
The returned QTcpSocket object cannot be used from another thread. If you want to use an incoming connection from another thread, you need to override incomingConnection().
So in the end I have to create another class that inherits from QTcpServer
.
I revised my code and added a custom server and waiter thread class.
[custom server class]
class CustomServer : public QTcpServer
{
Q_OBJECT
public:
WServer(QObject* = nullptr) : QTcpServer(parent) {}
signals:
void connectionRequest(qintptr);
protected:
void incomingConnection(qintptr socketDescriptor)
{
emit connectionRequest(socketDescriptor);
}
};
[custom thread class]
class Waiter : public QThread
{
Q_OBJECT
public:
Waiter(qintptr socketDescriptor, QObject *parent = nullptr)
: QThread(parent)
{
// Create socket
m_socket = new QTcpSocket(this);
m_socket->setSocketDescriptor(socketDescriptor);
}
signals:
void newSocket(QTcpSocket*);
protected:
void run()
{
// DO STUFF HERE
msleep(2500);
emit newSocket(m_socket);
}
private:
QTcpSocket *m_socket;
};
[new main class]
class ServerGUI : public QWidget
{
Q_OBJECT
public:
Server(QObject*);
private:
QList<QTcpSocket*> m_connections;
CustomServer m_server;
private slots:
void handleConnectionRequest(qintptr);
void handleNewSocket(QTcpSocket*);
}
void CustomServer::handleConnectionRequest(qintptr socketDescriptor)
{
Waiter *nextWaiter = new Waiter(socketDescriptor, this);
connect(nextWaiter, SIGNAL(newSocket(QTcpSocket*)), this, SLOT(handleNewSocket(QTcpSocket*)));
connect(nextWaiter, SIGNAL(finished()), this, SLOT(deleteLater()));
nextWaiter->start();
}
void CustomServer::handleNewSocket(QTcpSocket *socket)
{
// DO STUFF HERE ...
connect(socket, SIGNAL(disconnected()), this, SLOT(clientDisconnected()));
// FINALLY ADD TO ACTIVE-CLIENT LIST ...
}
Since CustomServer
is defined as a class member (m_server) within my main widget class (that handles GUI; called ServerGUI
),
connectionRequest(qintptr)
signal of m_server gets connected with handleConnectionRequest(qintptr)
slot of ServerGUI
instance.
But now my application is crashing immediately after startup, showing following message in debug window:
HEAP[qtapp.exe]: Invalid address specified to RtlValidateHeap( 000002204F430000, 0000006E0090F4C0 )
What may be the cause of this?
I adapted my code according to user2014561's answer.
CustomServer
classclass CustomServer : public QTcpServer
{
Q_OBJECT
public:
WServer(QHostAddress, quint16, quint16, QObject* = nullptr);
~WServer();
void kickAll();
void kickClient(qintptr);
QHostAddress localAddress() const;
quint16 serverPort() const;
bool isReady() const;
bool alreadyConnected(qintptr) const;
bool clientLimitExhausted() const;
signals:
void clientConnected(qintptr);
void clientDisconnected(qintptr);
private slots:
void destroyedfunc(QObject*);
// JUST FOR TESTING PURPOSES
void waiterFinished();
private:
QList<ServerPeer*> m_connections;
quint16 m_maxAllowedClients;
bool m_readyState;
void incomingConnection(qintptr);
};
for kickAll()
:
void WServer::kickAll()
{
while (!m_connections.isEmpty())
{
ServerPeer *peer = m_connections.first();
QEventLoop loop;
connect(peer->thread(), SIGNAL(destroyed()), &loop, SLOT(quit())); // ### PROBLEM ENCOUNTERED HERE
QMetaObject::invokeMethod(peer, "deleteLater", Qt::QueuedConnection);
loop.exec();
}
}
for kickClient(qintptr)
:
void WServer::kickClient(qintptr client_id)
{
foreach (ServerPeer *peer, m_connections)
{
bool peerState;
QMetaObject::invokeMethod(peer, "hasSocket", Qt::BlockingQueuedConnection,
Q_RETURN_ARG(bool, peerState), Q_ARG(qintptr, client_id));
if (peerState)
{
QEventLoop loop;
connect(peer->thread(), SIGNAL(destroyed()), &loop, SLOT(quit()));
QMetaObject::invokeMethod(peer, "deleteLater", Qt::QueuedConnection);
loop.exec();
break;
}
}
}
for destroyedfunc(QObject*)
:
void CustomServer::destroyedfunc(QObject *obj)
{
ServerPeer *peer = static_cast<ServerPeer*>(obj);
m_connections.removeAll(peer);
}
for incomingConnection(qintptr)
:
void WServer::incomingConnection(qintptr handle)
{
ServerPeer *peer = new ServerPeer();
QThread *waiter = new QThread();
m_connections.append(peer); // add to list
peer->moveToThread(waiter);
// notify about client connect
connect(peer, SIGNAL(connected(qintptr)), this, SIGNAL(clientConnected(qintptr)));
// stop waiter thread by indirectly raising finished() signal
connect(peer, SIGNAL(finished()), waiter, SLOT(quit()));
// notify about client disconnect
connect(peer, SIGNAL(disconnected(qintptr)), this, SIGNAL(clientDisconnected(qintptr)));
// remove client from list
connect(peer, SIGNAL(destroyed(QObject*)), this, SLOT(destroyedfunc(QObject*)));
// notify about finished waiter thread; only for debug purposes
connect(waiter, SIGNAL(finished()), this, SLOT(waiterFinished()));
// remove waiter thread when finished
connect(waiter, SIGNAL(finished()), waiter, SLOT(deleteLater()));
QMetaObject::invokeMethod(peer, "start", Qt::QueuedConnection,
Q_ARG(qintptr, handle));
waiter->start();
}
ServerPeer
classclass ServerPeer : public QObject
{
Q_OBJECT
public:
ServerPeer(QObject* = nullptr);
~ServerPeer();
bool hasSocket(qintptr) const;
signals:
void connected(qintptr);
void disconnected(qintptr);
void finished();
public slots:
void start(qintptr);
void disconnect();
private slots :
void notifyConnect();
void notifyDisconnect();
private:
QTcpSocket *m_peer;
qintptr m_id;
};
for ServerPeer(QObject*)
:
ServerPeer::ServerPeer(QObject *parent) : QObject(parent), m_peer(nullptr)
{
}
for ~ServerPeer()
:
ServerPeer::~ServerPeer()
{
disconnect();
}
for start(qintptr)
:
void ServerPeer::start(qintptr handle)
{
qDebug() << "New waiter thread has been started.";
m_peer = new QTcpSocket(this);
if (!m_peer->setSocketDescriptor(handle))
{
this->deleteLater();
return;
}
if (true /*verification here*/)
{
connect(m_peer, SIGNAL(disconnected()), this, SLOT(notifyDisconnect()));
connect(m_peer, SIGNAL(disconnected()), this, SLOT(deleteLater()));
// manually do connected notification
QTimer::singleShot(0, this, SLOT(notifyConnect()));
}
else
{
this->deleteLater();
}
emit finished();
}
for disconnect()
:
void ServerPeer::disconnect()
{
if (m_peer != nullptr)
{
if (m_peer->state() != QAbstractSocket::SocketState::ClosingState
&& m_peer->state() != QAbstractSocket::SocketState::UnconnectedState)
m_peer->abort();
delete m_peer;
m_peer = nullptr;
}
}
for notifyConnect()
:
void ServerPeer::notifyConnect()
{
emit connected(m_peer);
}
for notifyDisconnect()
:
void ServerPeer::notifyDisconnect()
{
emit disconnected(m_peer);
}
ServerGUI
classclass ServerGUI : public QWidget
{
Q_OBJECT
public:
ServerGUI(QWidget* = nullptr);
private:
Ui::ServerWindow ui;
CustomServer *m_server;
private slots:
// For further handling, e.g. updating client view
void handleNewClient(qintptr);
void handleRemovedClient(qintptr);
}
for ServerGUI(QWidget*)
:
ServerGUI::ServerGUI(QWidget *parent) : QWidget(parent)
{
// initialize gui elements;
// GENERATED WITH ACCORDING *.ui FILE
ui.setupUi(this);
m_server = new WServer(QHostAddress::LocalHost, 1234, 2, this);
if (!m_server->isReady())
{
qDebug() << "Server could not start!";
delete m_server;
m_server = nullptr;
return;
}
connect(m_server, SIGNAL(clientConnected(qintptr)), this, SLOT(handleNewClient(qintptr)));
connect(m_server, SIGNAL(clientDisconnected(qintptr)), this, SLOT(handleRemovedClient(qintptr)));
}
And here my main function:
int main(int argc, char *argv[])
{
QApplication a(argc, argv);
ServerGUI w;
w.show();
return a.exec();
}
With given code following message pops up if I try to kick a (selected) client:
QMetaObject::invokeMethod: No such method ServerPeer::hasSocket(qintptr)
QObject::connect: Cannot connect (null)::destroyed() to QEventLoop::quit()
How to fix this?
Upvotes: 3
Views: 3825
Reputation: 2881
If I understood correctly, you want to run each peer of your server on a separate thread, if so, then the following may help you:
QTcpServer
incomingConnection()
methodQThread
and of ServerPeer
and start the threadSIGNAL
- SLOT
connections to remove the peer from the list and delete the thread and the peers instancesServerPeer
to your QList
Edit, considerations:
You haven't got the connected
SIGNAL
because when you set a socketDescriptor to the socket was already connected, so you can simple assume after setSocketDescriptor
that the socket is connected and do what you want.
About the error when closing, it's happens because you not releasing properly the threads, see my edit how can you solve that.
Finally QTcpSocket
must not be accessed by diferent threads, if you need to call ServerPeer
from another thread use QMetaObject::invokeMethod
with QueuedConnection
or BlockingQueuedConnection
and SIGNAL
SLOT
mechanism.
Edit 2:
Now the server and it's peers will be deleted on MainWindow::closeEvent
and that way you can see the disconnected function being called. I guess the problem happens depending of the order that the classes will be deleted.
You can interact with the socket including send data through it, but I believe that will be painless use the Qt methods for cross-thread calls already mentioned. In my example you can easily write to a specific peer or to all peers.
//Step 1
#include <QtCore>
#include <QtNetwork>
#include "serverpeer.h"
class CustomServer : public QTcpServer
{
Q_OBJECT
public:
explicit CustomServer(const QHostAddress &host, quint16 port, quint16 maxconnections, QObject *parent = nullptr);
~CustomServer();
void kickAll();
void kickClient(qintptr id);
void writeData(const QByteArray &data, qintptr id);
void writeData(const QByteArray &data);
QHostAddress localAddress();
quint16 serverPort();
bool isReady();
signals:
void clientConnected(qintptr);
void clientDisconnected(qintptr);
private slots:
void destroyedfunc(QObject *obj);
private:
void incomingConnection(qintptr handle);
QList<ServerPeer*> m_connections;
int m_maxAllowedClients;
};
#include "customserver.h"
CustomServer::CustomServer(const QHostAddress &host, quint16 port, quint16 maxconnections, QObject *parent) :
m_maxAllowedClients(maxconnections), QTcpServer(parent)
{
listen(host, port);
}
CustomServer::~CustomServer()
{
kickAll();
}
//Step 2
void CustomServer::incomingConnection(qintptr handle)
{
// handle client limit
if (m_connections.size() >= m_maxAllowedClients)
{
qDebug() << "Can't allow new connection: client limit reached!";
QTcpSocket *socket = new QTcpSocket();
socket->setSocketDescriptor(handle);
socket->abort();
delete socket;
return;
}
//Step 3
ServerPeer *peer = new ServerPeer();
QThread *waiter = new QThread();
peer->moveToThread(waiter);
//Step 4
connect(peer, SIGNAL(connected(qintptr)), this, SIGNAL(clientConnected(qintptr)));
connect(peer, SIGNAL(disconnected(qintptr)), this, SIGNAL(clientDisconnected(qintptr)));
connect(peer, SIGNAL(destroyed()), waiter, SLOT(quit()));
connect(peer, SIGNAL(destroyed(QObject*)), this, SLOT(destroyedfunc(QObject*)));
connect(waiter, SIGNAL(finished()), waiter, SLOT(deleteLater()));
QMetaObject::invokeMethod(peer, "start", Qt::QueuedConnection, Q_ARG(qintptr, handle));
waiter->start();
//Step 5
m_connections.append(peer);
}
void CustomServer::kickAll()
{
while (!m_connections.isEmpty())
{
ServerPeer *peer = m_connections.first();
QEventLoop loop;
connect(peer->thread(), SIGNAL(destroyed()), &loop, SLOT(quit()));
QMetaObject::invokeMethod(peer, "deleteLater", Qt::QueuedConnection);
loop.exec();
}
}
void CustomServer::kickClient(qintptr id)
{
foreach (ServerPeer *peer, m_connections)
{
ServerPeer::State hassocket;
QMetaObject::invokeMethod(peer, "hasSocket", Qt::BlockingQueuedConnection, Q_RETURN_ARG(ServerPeer::State, hassocket), Q_ARG(qintptr, id));
if (hassocket == ServerPeer::MyTRUE)
{
QEventLoop loop;
connect(peer->thread(), SIGNAL(destroyed()), &loop, SLOT(quit()));
QMetaObject::invokeMethod(peer, "deleteLater", Qt::QueuedConnection);
loop.exec();
break;
}
}
}
void CustomServer::writeData(const QByteArray &data)
{
foreach (ServerPeer *peer, m_connections)
QMetaObject::invokeMethod(peer, "writeData", Qt::QueuedConnection, Q_ARG(QByteArray, data));
}
void CustomServer::writeData(const QByteArray &data, qintptr id)
{
foreach (ServerPeer *peer, m_connections)
{
ServerPeer::State hassocket;
QMetaObject::invokeMethod(peer, "hasSocket", Qt::BlockingQueuedConnection, Q_RETURN_ARG(ServerPeer::State, hassocket), Q_ARG(qintptr, id));
if (hassocket == ServerPeer::MyTRUE)
{
QMetaObject::invokeMethod(peer, "writeData", Qt::QueuedConnection, Q_ARG(QByteArray, data));
break;
}
}
}
QHostAddress CustomServer::localAddress()
{
return QTcpServer::serverAddress();
}
quint16 CustomServer::serverPort()
{
return QTcpServer::serverPort();
}
bool CustomServer::isReady()
{
return QTcpServer::isListening();
}
void CustomServer::destroyedfunc(QObject *obj)
{
ServerPeer *peer = static_cast<ServerPeer*>(obj);
m_connections.removeAll(peer);
}
#include <QtCore>
#include <QtNetwork>
class ServerPeer : public QObject
{
Q_OBJECT
public:
explicit ServerPeer(QObject *parent = nullptr);
~ServerPeer();
enum State
{
MyTRUE,
MyFALSE
};
signals:
void connected(qintptr id);
void disconnected(qintptr id);
public slots:
ServerPeer::State hasSocket(qintptr id);
void start(qintptr handle);
void writeData(const QByteArray &data);
private slots:
void readyRead();
void notifyConnect();
void notifyDisconnect();
private:
QTcpSocket *m_peer;
qintptr m_id;
};
#include "serverpeer.h"
ServerPeer::ServerPeer(QObject *parent) : QObject(parent), m_peer(nullptr)
{
}
ServerPeer::~ServerPeer()
{
if (m_peer)
m_peer->abort();
}
ServerPeer::State ServerPeer::hasSocket(qintptr id)
{
if (m_id == id)
return MyTRUE;
else
return MyFALSE;
}
void ServerPeer::start(qintptr handle)
{
m_peer = new QTcpSocket(this);
m_peer->setSocketDescriptor(handle);
//Step 6
if (true /*verification here*/)
{
m_id = handle;
QTimer::singleShot(0, this, SLOT(notifyConnect()));
connect(m_peer, SIGNAL(readyRead()), this, SLOT(readyRead()));
connect(m_peer, SIGNAL(disconnected()), this, SLOT(notifyDisconnect()));
connect(m_peer, SIGNAL(disconnected()), this, SLOT(deleteLater()));
}
else
{
m_peer->abort();
this->deleteLater();
}
}
void ServerPeer::readyRead()
{
qDebug() << m_peer->readAll() << QThread::currentThread();
}
void ServerPeer::writeData(const QByteArray &data)
{
m_peer->write(data);
m_peer->flush();
}
void ServerPeer::notifyConnect()
{
emit connected(m_id);
}
void ServerPeer::notifyDisconnect()
{
emit disconnected(m_id);
}
MainWindow::MainWindow(QWidget *parent) :
QMainWindow(parent),
ui(new Ui::MainWindow)
{
ui->setupUi(this);
qRegisterMetaType<qintptr>("qintptr");
m_server = new CustomServer(QHostAddress::LocalHost, 1024, 2, this);
if (!m_server->isReady())
{
qDebug() << "Server could not start!";
delete m_server;
m_server = nullptr;
return;
}
connect(m_server, SIGNAL(clientConnected(qintptr)), this, SLOT(handleNewClient(qintptr)));
connect(m_server, SIGNAL(clientDisconnected(qintptr)), this, SLOT(handleRemovedClient(qintptr)));
}
MainWindow::~MainWindow()
{
delete ui;
}
void MainWindow::closeEvent(QCloseEvent *)
{
if (m_server)
{
delete m_server;
m_server = nullptr;
}
}
void MainWindow::handleNewClient(qintptr id)
{
qDebug() << __FUNCTION__ << id;
m_server->writeData(QString("Hello client id: %0\r\n").arg(id).toLatin1(), id);
m_server->writeData(QString("New client id: %0\r\n").arg(id).toLatin1());
}
void MainWindow::handleRemovedClient(qintptr id)
{
qDebug() << __FUNCTION__ << id;
}
Upvotes: 1