Reputation: 11
I'm really unsure of how to address this problem so i will explain it first. I need to run a number of threads that each connect to some application via TCPSocket, so far no problem. The application is quite time consuming that is why i want it to run parallel on multiple threads and one thread each to communicate with it. Once the computation is finished i want to send the results to another thread collecting the results. Therefor i wrote a Worker class:
class Worker : public QObject {
Q_OBJECT
public:
Worker();
Worker(int port);
~Worker();
QTcpSocket* sock;
void insert();
public slots:
void connect();
void process(const int &id, const QString ¶m, const int &arity);
signals:
void ready();
void finished(const int &id, const int &consistent, const QString &result);
void error(QString err);
};
Now that superThread is supposed to work off a huge file and needs to spread it around the threads, and then receive and handle the results. My approach so far is another superThread that is connected in the main() as follows:
QThread* superThread = new QThread();
supWorker* super = new supWorker();
for (int i = 0; i < nrWorkers; i++){
Worker* worker = new Worker(portRange+i);
QThread* workerThread = new QThread();
QThread::connect(workerThread, SIGNAL(started()), worker, SLOT(connect()));
worker->moveToThread(workerThread);
workerThread->start();
QThread::connect(super, SIGNAL(process(int, QString, int)), worker, SLOT(process(int,QString,int)));
QThread::connect(worker, SIGNAL(finished(int, int, QString)), super, SLOT(handleResult(int, int, QString)));
}
The problem this way is obviously that i can only send the SIGNAL to all connected threads. What i want the superThread to do is send arguments to only one of the threads. I don't know how i can handle the connection so that only one of the working threads receives it?
Any help or architectural ideas much appreciated, thanks in advance.
Upvotes: 1
Views: 1455
Reputation: 11934
Not sure if I got the idea 100% right, but why not pass the array of worker threads to the super thread, keep an index that denotes the index of the currently active thread, connect signals only to that one, dispatch the signals when needed, wait for completion, disconnect the signals, advance the index and repeat? If dispatching serialized signals to the threads is what you really want this could work.
EDIT
Ok I actually pulled myself togetger to make a Qt based sample that implements the needed workflow and put it on github.
#pragma once
#include <QThread>
#include <QApplication>
#include <QMetaType>
#include <QTimer>
#include <vector>
#include <memory>
#include <cstdio>
#include <algorithm>
struct Work
{
int m_work;
};
struct Result
{
int m_result;
int m_workerIndex;
};
Q_DECLARE_METATYPE(Work);
Q_DECLARE_METATYPE(Result);
class Worker : public QThread
{
Q_OBJECT
public:
Worker(int workerIndex) : m_workerIndex(workerIndex)
{
moveToThread(this);
connect(this, SIGNAL(WorkReceived(Work)), SLOT(DoWork(Work)));
printf("[%d]: worker %d initialized\n", reinterpret_cast<int>(currentThreadId()), workerIndex);
}
void DispatchWork(Work work)
{
emit WorkReceived(work);
}
public slots:
void DoWork(Work work)
{
printf("[%d]: worker %d received work %d\n", reinterpret_cast<int>(currentThreadId()), m_workerIndex, work.m_work);
msleep(100);
Result result = { work.m_work * 2, m_workerIndex };
emit WorkDone(result);
}
signals:
void WorkReceived(Work work);
void WorkDone(Result result);
private:
int m_workerIndex;
};
class Master : public QObject
{
Q_OBJECT
public:
Master(int workerCount) : m_activeWorker(0), m_workerCount(workerCount)
{
printf("[%d]: creating master thread\n", reinterpret_cast<int>(QThread::currentThreadId()));
}
~Master()
{
std::for_each(m_workers.begin(), m_workers.end(), [](std::unique_ptr<Worker>& worker)
{
worker->quit();
worker->wait();
});
}
public slots:
void Initialize()
{
printf("[%d]: initializing master thread\n", reinterpret_cast<int>(QThread::currentThreadId()));
for (int workerIndex = 0; workerIndex < m_workerCount; ++workerIndex)
{
auto worker = new Worker(workerIndex);
m_workers.push_back(std::move(std::unique_ptr<Worker>(worker)));
connect(worker, SIGNAL(WorkDone(Result)), SLOT(WorkDone(Result)));
worker->start();
}
m_timer = new QTimer();
m_timer->setInterval(500);
connect(m_timer, SIGNAL(timeout()), SLOT(GenerateWork()));
m_timer->start();
}
void GenerateWork()
{
Work work = { m_activeWorker };
printf("[%d]: dispatching work %d to worker %d\n", reinterpret_cast<int>(QThread::currentThreadId()), work.m_work, m_activeWorker);
m_workers[m_activeWorker]->DispatchWork(work);
m_activeWorker = ++m_activeWorker % m_workers.size();
}
void WorkDone(Result result)
{
printf("[%d]: received result %d from worker %d\n", reinterpret_cast<int>(QThread::currentThreadId()), result.m_result, result.m_workerIndex);
}
void Terminate()
{
m_timer->stop();
delete m_timer;
}
private:
int m_workerCount;
std::vector<std::unique_ptr<Worker>> m_workers;
int m_activeWorker;
QTimer* m_timer;
};
QtThreadExample.cpp:
#include "QtThreadExample.hpp"
#include <QTimer>
int main(int argc, char** argv)
{
qRegisterMetaType<Work>("Work");
qRegisterMetaType<Result>("Result");
QApplication application(argc, argv);
QThread masterThread;
Master master(5);
master.moveToThread(&masterThread);
master.connect(&masterThread, SIGNAL(started()), SLOT(Initialize()));
master.connect(&masterThread, SIGNAL(terminated()), SLOT(Terminate()));
masterThread.start();
// Set a timer to terminate the program after 10 seconds
QTimer::singleShot(10 * 1000, &application, SLOT(quit()));
application.exec();
masterThread.quit();
masterThread.wait();
printf("[%d]: master thread has finished\n", reinterpret_cast<int>(QThread::currentThreadId()));
return 0;
}
In general the solution is to actually not emit a signal from the master thread itself, but emit the signal from the worker thread - this way you get a unique signal for each thread and can process the work asynchronously in the event loop and then emit back a signal that the thread is done. The sample can and should be refactored to your needs, but in general it demonstrates a producer/consumer patter in Qt using the idea with the index and signaling threads one by one. I am using a general timer to generate work in the master thread (Master::m_timer
) - I guess in your case you will be generating work with a signal coming from a socket, file or something else. Then I call a method on an active worker thread and the method emits a signal to the event loop of the worker thread to start performing the work and then emit a signal about completion. That is a general description, look a the sample, try it out, and if you have any follow up questions let me know.
I guess this works fairly nice if you are using Qt objects, but in the traditional sense of consumer/producer patterns the signal/slot stuff actually make the life a bit harder, a standard C++11 pipeline with a std::condition_variable
and a master thread calling condition_variable::notify_one() and the worker threads simply waiting on the condition variable would be easier, however Qt has nice wrappers for all the I/O stuff. So just try this out and decide.
Below is the sample output of the example, I guess the thread logging indicates that the needed effect is achieved:
One more note, since QApplication
runs an event loop itself, if you do not have a GUI you can actually let all your I/O objects and master class live in the main thread and emit signals from there, thus eliminating the need for a separate master thread. Of course if you have a GUI you wouldn't want to burden it with this stuff.
Upvotes: 2