neuronalbit
neuronalbit

Reputation: 360

QTcpSocket handling in another QThread

I need to handle incoming tcp connections on individual QThread's. After successful client authentication, the according socket should be stored in an QList object.

[simplified main/server-side application]

class Server : public QObject
{
    Q_OBJECT

public:
    Server();

private:
    QList<QTcpSocket*> m_connections;
    QTcpServer m_server;
    void handleIncomingConnection();
    void handleWaiterThread();

private slots:
    void treatFinishedWaiterThread();
}

[according function definitions]

handleIncomingConnection() slot is connected with the server object's (m_server) newConnection() signal.

void Server::handleIncomingConnection()
{
    QThread *waiter = new QThread();
    connect(waiter, SIGNAL(started()), this, SLOT(handleWaiterThread()));
    connect(waiter, SIGNAL(finished()), this, SLOT(treatFinishedWaiterThread()));
    moveToThread(waiter);
    waiter->start();
}
void Server::handleWaiterThread()
{
    // fetch requesting socket
    QTcpSocket *socket = m_server->nextPendingConnection();

    // HANDLE PASSWORD AUTHENTICATION HERE ...
    // IF SUCCESSFUL, CONTINUE
    
    connect(socket, SIGNAL(disconnected()), this, SLOT(clientDisconnected()));

    // add to list
    m_connections.append(socket);
}
void Server::treatFinishedWaiterThread()
{
    QThread *caller = qobject_cast<QThread*>(sender());
    caller->deleteLater();
}

If I try to run this, the threads get created but no SIGNAL is emitted when they're done, so I can't delete threads afterwards. Additionally I get this message:

QObject::moveToThread: Widgets cannot be moved to a new thread

How to fix this?




[01.06.2016]

According to QTcpServer::nextPendingConnection() it says:

The returned QTcpSocket object cannot be used from another thread. If you want to use an incoming connection from another thread, you need to override incomingConnection().

So in the end I have to create another class that inherits from QTcpServer.




[01.07.2016 #1]

I revised my code and added a custom server and waiter thread class.

[custom server class]

class CustomServer : public QTcpServer
{
    Q_OBJECT

public:
    WServer(QObject* = nullptr) : QTcpServer(parent) {}

signals:
    void connectionRequest(qintptr);

protected:
    void incomingConnection(qintptr socketDescriptor)
    {
        emit connectionRequest(socketDescriptor);
    }
};

[custom thread class]

class Waiter : public QThread
{
    Q_OBJECT

public:
    Waiter(qintptr socketDescriptor, QObject *parent = nullptr)
        : QThread(parent)
    {
        // Create socket
        m_socket = new QTcpSocket(this);
        m_socket->setSocketDescriptor(socketDescriptor);
    }

signals:
    void newSocket(QTcpSocket*);

protected:
    void run()
    {
        // DO STUFF HERE
        msleep(2500);
        
        emit newSocket(m_socket);
    }

private:
    QTcpSocket *m_socket;
};

[new main class]

class ServerGUI : public QWidget
{
    Q_OBJECT

public:
    Server(QObject*);

private:
    QList<QTcpSocket*> m_connections;
    CustomServer m_server;

private slots:
    void handleConnectionRequest(qintptr);
    void handleNewSocket(QTcpSocket*);
}
void CustomServer::handleConnectionRequest(qintptr socketDescriptor)
{
    Waiter *nextWaiter = new Waiter(socketDescriptor, this);
    connect(nextWaiter, SIGNAL(newSocket(QTcpSocket*)), this, SLOT(handleNewSocket(QTcpSocket*)));
    connect(nextWaiter, SIGNAL(finished()), this, SLOT(deleteLater()));
    nextWaiter->start();
}
void CustomServer::handleNewSocket(QTcpSocket *socket)
{
    // DO STUFF HERE ...

    connect(socket, SIGNAL(disconnected()), this, SLOT(clientDisconnected()));

    // FINALLY ADD TO ACTIVE-CLIENT LIST ...
}

Signal & Slot specific settings:

Since CustomServer is defined as a class member (m_server) within my main widget class (that handles GUI; called ServerGUI), connectionRequest(qintptr) signal of m_server gets connected with handleConnectionRequest(qintptr) slot of ServerGUI instance.

But now my application is crashing immediately after startup, showing following message in debug window:

HEAP[qtapp.exe]: Invalid address specified to RtlValidateHeap( 000002204F430000, 0000006E0090F4C0 )

What may be the cause of this?




[01.10.2016 #2]

I adapted my code according to user2014561's answer.

for CustomServer class

class CustomServer : public QTcpServer
{
    Q_OBJECT

public:
    WServer(QHostAddress, quint16, quint16, QObject* = nullptr);
    ~WServer();

    void kickAll();
    void kickClient(qintptr);

    QHostAddress localAddress() const;
    quint16 serverPort() const;
    bool isReady() const;
    bool alreadyConnected(qintptr) const;
    bool clientLimitExhausted() const;

signals:
    void clientConnected(qintptr);
    void clientDisconnected(qintptr);

private slots:
    void destroyedfunc(QObject*);

    // JUST FOR TESTING PURPOSES
    void waiterFinished();

private:
    QList<ServerPeer*> m_connections;
    quint16 m_maxAllowedClients;
    bool m_readyState;

    void incomingConnection(qintptr);
};

for kickAll():

void WServer::kickAll()
{
    while (!m_connections.isEmpty())
    {
        ServerPeer *peer = m_connections.first();
        QEventLoop loop;
        connect(peer->thread(), SIGNAL(destroyed()), &loop, SLOT(quit())); // ### PROBLEM ENCOUNTERED HERE
        QMetaObject::invokeMethod(peer, "deleteLater", Qt::QueuedConnection);
        loop.exec();
    }
}

for kickClient(qintptr):

void WServer::kickClient(qintptr client_id)
{
    foreach (ServerPeer *peer, m_connections)
    {
        bool peerState;
        QMetaObject::invokeMethod(peer, "hasSocket", Qt::BlockingQueuedConnection,
            Q_RETURN_ARG(bool, peerState), Q_ARG(qintptr, client_id));
        if (peerState)
        {
            QEventLoop loop;
            connect(peer->thread(), SIGNAL(destroyed()), &loop, SLOT(quit()));
            QMetaObject::invokeMethod(peer, "deleteLater", Qt::QueuedConnection);
            loop.exec();
            break;
        }
    }
}

for destroyedfunc(QObject*):

void CustomServer::destroyedfunc(QObject *obj)
{
    ServerPeer *peer = static_cast<ServerPeer*>(obj);
    m_connections.removeAll(peer);
}

for incomingConnection(qintptr):

void WServer::incomingConnection(qintptr handle)
{
    ServerPeer *peer = new ServerPeer();
    QThread *waiter = new QThread();

    m_connections.append(peer); // add to list
    peer->moveToThread(waiter);
    
    // notify about client connect
    connect(peer, SIGNAL(connected(qintptr)), this, SIGNAL(clientConnected(qintptr)));
    // stop waiter thread by indirectly raising finished() signal
    connect(peer, SIGNAL(finished()), waiter, SLOT(quit()));
    // notify about client disconnect
    connect(peer, SIGNAL(disconnected(qintptr)), this, SIGNAL(clientDisconnected(qintptr)));
    // remove client from list
    connect(peer, SIGNAL(destroyed(QObject*)), this, SLOT(destroyedfunc(QObject*)));
    // notify about finished waiter thread; only for debug purposes
    connect(waiter, SIGNAL(finished()), this, SLOT(waiterFinished()));
    // remove waiter thread when finished
    connect(waiter, SIGNAL(finished()), waiter, SLOT(deleteLater()));

    QMetaObject::invokeMethod(peer, "start", Qt::QueuedConnection,
        Q_ARG(qintptr, handle));

    waiter->start();
}

for ServerPeer class

class ServerPeer : public QObject
{
    Q_OBJECT

public:
    ServerPeer(QObject* = nullptr);
    ~ServerPeer();

    bool hasSocket(qintptr) const;

signals:
    void connected(qintptr);
    void disconnected(qintptr);
    void finished();

public slots:
    void start(qintptr);
    void disconnect();

private slots :
    void notifyConnect();
    void notifyDisconnect();

private:
    QTcpSocket *m_peer;
    qintptr m_id;
};

for ServerPeer(QObject*):

ServerPeer::ServerPeer(QObject *parent) : QObject(parent), m_peer(nullptr)
{

}

for ~ServerPeer():

ServerPeer::~ServerPeer()
{
    disconnect();
}

for start(qintptr):

void ServerPeer::start(qintptr handle)
{
    qDebug() << "New waiter thread has been started.";

    m_peer = new QTcpSocket(this);
    if (!m_peer->setSocketDescriptor(handle))
    {
        this->deleteLater();
        return;
    }

    if (true /*verification here*/)
    {
        connect(m_peer, SIGNAL(disconnected()), this, SLOT(notifyDisconnect()));
        connect(m_peer, SIGNAL(disconnected()), this, SLOT(deleteLater()));

        // manually do connected notification
        QTimer::singleShot(0, this, SLOT(notifyConnect()));
    }
    else
    {
        this->deleteLater();
    }
    
    emit finished();
}

for disconnect():

void ServerPeer::disconnect()
{
    if (m_peer != nullptr)
    {
        if (m_peer->state() != QAbstractSocket::SocketState::ClosingState
            && m_peer->state() != QAbstractSocket::SocketState::UnconnectedState)
            m_peer->abort();

        delete m_peer;
        m_peer = nullptr;
    }
}

for notifyConnect():

void ServerPeer::notifyConnect()
{
    emit connected(m_peer);
}

for notifyDisconnect():

void ServerPeer::notifyDisconnect()
{
    emit disconnected(m_peer);
}

for ServerGUI class

class ServerGUI : public QWidget
{
    Q_OBJECT

public:
    ServerGUI(QWidget* = nullptr);

private:
    Ui::ServerWindow ui;
    CustomServer *m_server;

private slots:
    // For further handling, e.g. updating client view
    void handleNewClient(qintptr);
    void handleRemovedClient(qintptr);
}

for ServerGUI(QWidget*):

ServerGUI::ServerGUI(QWidget *parent) : QWidget(parent)
{
    // initialize gui elements;
    // GENERATED WITH ACCORDING *.ui FILE
    ui.setupUi(this);
    
    m_server = new WServer(QHostAddress::LocalHost, 1234, 2, this);
    if (!m_server->isReady())
    {
        qDebug() << "Server could not start!";
        delete m_server;
        m_server = nullptr;
        return;
    }
    
    connect(m_server, SIGNAL(clientConnected(qintptr)), this, SLOT(handleNewClient(qintptr)));
    connect(m_server, SIGNAL(clientDisconnected(qintptr)), this, SLOT(handleRemovedClient(qintptr)));
}

And here my main function:

int main(int argc, char *argv[])
{
    QApplication a(argc, argv);
    ServerGUI w;
    
    w.show();
    return a.exec();
}

With given code following message pops up if I try to kick a (selected) client:

QMetaObject::invokeMethod: No such method ServerPeer::hasSocket(qintptr)

QObject::connect: Cannot connect (null)::destroyed() to QEventLoop::quit()

How to fix this?

Upvotes: 3

Views: 3825

Answers (1)

Antonio Dias
Antonio Dias

Reputation: 2881

If I understood correctly, you want to run each peer of your server on a separate thread, if so, then the following may help you:

  1. create a subclass of QTcpServer
  2. reimplement incomingConnection() method
  3. create an instance (without parent) of QThread and of ServerPeer and start the thread
  4. do the SIGNAL - SLOT connections to remove the peer from the list and delete the thread and the peers instances
  5. add ServerPeer to your QList
  6. once time started, do the credentials verification; if you reject them, abort the connection

Edit, considerations:

You haven't got the connected SIGNAL because when you set a socketDescriptor to the socket was already connected, so you can simple assume after setSocketDescriptor that the socket is connected and do what you want.

About the error when closing, it's happens because you not releasing properly the threads, see my edit how can you solve that.

Finally QTcpSocket must not be accessed by diferent threads, if you need to call ServerPeer from another thread use QMetaObject::invokeMethod with QueuedConnection or BlockingQueuedConnection and SIGNAL SLOT mechanism.

Edit 2:

Now the server and it's peers will be deleted on MainWindow::closeEvent and that way you can see the disconnected function being called. I guess the problem happens depending of the order that the classes will be deleted.

You can interact with the socket including send data through it, but I believe that will be painless use the Qt methods for cross-thread calls already mentioned. In my example you can easily write to a specific peer or to all peers.


customserver.h:

//Step 1

#include <QtCore>
#include <QtNetwork>
#include "serverpeer.h"

class CustomServer : public QTcpServer
{
    Q_OBJECT
public:
    explicit CustomServer(const QHostAddress &host, quint16 port, quint16 maxconnections, QObject *parent = nullptr);
    ~CustomServer();

    void kickAll();
    void kickClient(qintptr id);
    void writeData(const QByteArray &data, qintptr id);
    void writeData(const QByteArray &data);

    QHostAddress localAddress();
    quint16 serverPort();
    bool isReady();

signals:
    void clientConnected(qintptr);
    void clientDisconnected(qintptr);

private slots:
    void destroyedfunc(QObject *obj);

private:
    void incomingConnection(qintptr handle);

    QList<ServerPeer*> m_connections;
    int m_maxAllowedClients;
};

customserver.cpp:

#include "customserver.h"

CustomServer::CustomServer(const QHostAddress &host, quint16 port, quint16 maxconnections, QObject *parent) :
    m_maxAllowedClients(maxconnections), QTcpServer(parent)
{
    listen(host, port);
}

CustomServer::~CustomServer()
{
    kickAll();
}

//Step 2
void CustomServer::incomingConnection(qintptr handle)
{
    // handle client limit
    if (m_connections.size() >= m_maxAllowedClients)
    {
        qDebug() << "Can't allow new connection: client limit reached!";
        QTcpSocket *socket = new QTcpSocket();
        socket->setSocketDescriptor(handle);
        socket->abort();
        delete socket;
        return;
    }

    //Step 3
    ServerPeer *peer = new ServerPeer();
    QThread *waiter = new QThread();

    peer->moveToThread(waiter);

    //Step 4
    connect(peer, SIGNAL(connected(qintptr)), this, SIGNAL(clientConnected(qintptr)));
    connect(peer, SIGNAL(disconnected(qintptr)), this, SIGNAL(clientDisconnected(qintptr)));
    connect(peer, SIGNAL(destroyed()), waiter, SLOT(quit()));
    connect(peer, SIGNAL(destroyed(QObject*)), this, SLOT(destroyedfunc(QObject*)));
    connect(waiter, SIGNAL(finished()), waiter, SLOT(deleteLater()));

    QMetaObject::invokeMethod(peer, "start", Qt::QueuedConnection, Q_ARG(qintptr, handle));

    waiter->start();

    //Step 5
    m_connections.append(peer);
}

void CustomServer::kickAll()
{
    while (!m_connections.isEmpty())
    {
        ServerPeer *peer = m_connections.first();
        QEventLoop loop;
        connect(peer->thread(), SIGNAL(destroyed()), &loop, SLOT(quit()));
        QMetaObject::invokeMethod(peer, "deleteLater", Qt::QueuedConnection);
        loop.exec();
    }
}

void CustomServer::kickClient(qintptr id)
{
    foreach (ServerPeer *peer, m_connections)
    {
        ServerPeer::State hassocket;
        QMetaObject::invokeMethod(peer, "hasSocket", Qt::BlockingQueuedConnection, Q_RETURN_ARG(ServerPeer::State, hassocket), Q_ARG(qintptr, id));
        if (hassocket == ServerPeer::MyTRUE)
        {
            QEventLoop loop;
            connect(peer->thread(), SIGNAL(destroyed()), &loop, SLOT(quit()));
            QMetaObject::invokeMethod(peer, "deleteLater", Qt::QueuedConnection);
            loop.exec();
            break;
        }
    }
}

void CustomServer::writeData(const QByteArray &data)
{
    foreach (ServerPeer *peer, m_connections)
        QMetaObject::invokeMethod(peer, "writeData", Qt::QueuedConnection, Q_ARG(QByteArray, data));
}

void CustomServer::writeData(const QByteArray &data, qintptr id)
{
    foreach (ServerPeer *peer, m_connections)
    {
        ServerPeer::State hassocket;
        QMetaObject::invokeMethod(peer, "hasSocket", Qt::BlockingQueuedConnection, Q_RETURN_ARG(ServerPeer::State, hassocket), Q_ARG(qintptr, id));
        if (hassocket == ServerPeer::MyTRUE)
        {
            QMetaObject::invokeMethod(peer, "writeData", Qt::QueuedConnection, Q_ARG(QByteArray, data));
            break;
        }
    }
}

QHostAddress CustomServer::localAddress()
{
    return QTcpServer::serverAddress();
}

quint16 CustomServer::serverPort()
{
    return QTcpServer::serverPort();
}

bool CustomServer::isReady()
{
    return QTcpServer::isListening();
}

void CustomServer::destroyedfunc(QObject *obj)
{
    ServerPeer *peer = static_cast<ServerPeer*>(obj);
    m_connections.removeAll(peer);
}

serverpeer.h:

#include <QtCore>
#include <QtNetwork>

class ServerPeer : public QObject
{
    Q_OBJECT
public:
    explicit ServerPeer(QObject *parent = nullptr);
    ~ServerPeer();

    enum State
    {
        MyTRUE,
        MyFALSE
    };

signals:
    void connected(qintptr id);
    void disconnected(qintptr id);

public slots:
    ServerPeer::State hasSocket(qintptr id);
    void start(qintptr handle);
    void writeData(const QByteArray &data);

private slots:
    void readyRead();
    void notifyConnect();
    void notifyDisconnect();

private:
    QTcpSocket *m_peer;
    qintptr m_id;
};

serverpeer.cpp:

#include "serverpeer.h"

ServerPeer::ServerPeer(QObject *parent) : QObject(parent), m_peer(nullptr)
{

}

ServerPeer::~ServerPeer()
{
    if (m_peer)
        m_peer->abort();
}

ServerPeer::State ServerPeer::hasSocket(qintptr id)
{
    if (m_id == id)
        return MyTRUE;
    else
        return MyFALSE;
}
void ServerPeer::start(qintptr handle)
{
    m_peer = new QTcpSocket(this);
    m_peer->setSocketDescriptor(handle);

    //Step 6
    if (true /*verification here*/)
    {
        m_id = handle;
        QTimer::singleShot(0, this, SLOT(notifyConnect()));
        connect(m_peer, SIGNAL(readyRead()), this, SLOT(readyRead()));
        connect(m_peer, SIGNAL(disconnected()), this, SLOT(notifyDisconnect()));
        connect(m_peer, SIGNAL(disconnected()), this, SLOT(deleteLater()));
    }
    else
    {
        m_peer->abort();
        this->deleteLater();
    }
}

void ServerPeer::readyRead()
{
    qDebug() << m_peer->readAll() << QThread::currentThread();
}

void ServerPeer::writeData(const QByteArray &data)
{
    m_peer->write(data);
    m_peer->flush();
}

void ServerPeer::notifyConnect()
{
    emit connected(m_id);
}

void ServerPeer::notifyDisconnect()
{
    emit disconnected(m_id);
}

mainwindow.cpp:

MainWindow::MainWindow(QWidget *parent) :
    QMainWindow(parent),
    ui(new Ui::MainWindow)
{
    ui->setupUi(this);

    qRegisterMetaType<qintptr>("qintptr");

    m_server = new CustomServer(QHostAddress::LocalHost, 1024, 2, this);

    if (!m_server->isReady())
    {
        qDebug() << "Server could not start!";
        delete m_server;
        m_server = nullptr;
        return;
    }

    connect(m_server, SIGNAL(clientConnected(qintptr)), this, SLOT(handleNewClient(qintptr)));
    connect(m_server, SIGNAL(clientDisconnected(qintptr)), this, SLOT(handleRemovedClient(qintptr)));
}

MainWindow::~MainWindow()
{
    delete ui;
}

void MainWindow::closeEvent(QCloseEvent *)
{
    if (m_server)
    {
        delete m_server;
        m_server = nullptr;
    }
}

void MainWindow::handleNewClient(qintptr id)
{
    qDebug() << __FUNCTION__ << id;
    m_server->writeData(QString("Hello client id: %0\r\n").arg(id).toLatin1(), id);
    m_server->writeData(QString("New client id: %0\r\n").arg(id).toLatin1());
}

void MainWindow::handleRemovedClient(qintptr id)
{
    qDebug() << __FUNCTION__ << id;
}

Upvotes: 1

Related Questions