Anon
Anon

Reputation: 2492

How can you add to a QStringList being operated on by QtConcurrent::map?

I'm trying to create an application that will perpetually monitor a folder, and add any new files to a queue for multi-threaded processing.

This is what I have:

int main(int argc, char *argv[])
{
    QCoreApplication a(argc, argv);
    QDir dir("/home/boat/Programming/");
    QStringList folders = QStringList() << "/home/boat/Programming/";
    QStringList filters = QStringList() << "*.boat";
    QStringList boatFileNames = dir.entryList(filters);

    /* Start Threading | Files from list are deleted on completion */
    QtConcurrent::map(boatFileNames, &someFunction);

    /* Monitor folder for new files to add to queue */
    QFileSystemWatcher fsw(folders);
    QObject::connect(&fsw,&QFileSystemWatcher::directoryChanged,[&](){
            ????
    });
    a.exec();
}

The only thing I need to ensure for thread safety is that I can not have one file being operated on by two threads of the same function.

The way I see it, I have two options, neither which I know how to do:

  1. Wait until the map is finished, check folder again for files, repopulate QStringList, and reinitiate map.
  2. Add to the QStringList feeding the map, assuming that the map never closes, and that you can just keep adding items to the queue and it will operate the function on them.

Are either of these possible, or is there another way I should be going about this?

Upvotes: 0

Views: 388

Answers (1)

Alexander V
Alexander V

Reputation: 8718

How can you add to a QStringList being operated on by QtConcurrent::map? I'm trying to create an application that will perpetually monitor a folder, and add any new files to a queue for multi-threaded processing.

I would not even use QtConcurrent::map because the task can be implemented with simple stack- or queue- like container protected by the mutex.

Connect all the worker (consumer) threads to own signal, say void MyClass:: newFileAvailable().

Make void QFileSystemWatcher::directoryChanged(const QString &path) signal to populate the queue-like (or you can make it stack-like) structure:

class FilePathQueue
{
public:
   void push(const QString& path)
   {
      QMutexLocker lock(&mutex);
      list.push_back(path);
   }
   QString pop()
   {
      QMutexLocker lock(&mutex);
      if (list.isEmpty())
         return QString();
      QString ret = list.front();
      list.pop_front();
      return ret;
   }
private:
   QMutex mutex;
   QStringList list;
};

So, all the worker threads should subscribe to newFileAvailable() signal and will consume the new file path via FilePathQueue::pop() with checking if there any items available left.

/* File path thread-safe container */
FilePathQueue fileList;

/* Monitor folder for new files to add to queue */
QFileSystemWatcher fsw(folders);
QObject::connect(&fsw, &QFileSystemWatcher::directoryChanged,
   [&](const QString& path){
        fileList.push(path);     // push in new path
        emit newFileAvailable(); // let workers know
        // TODO: here we can have condition.notify_all() or notify_one()
        // instead of signal emit for the higher performance
});

// launch new workers
for(int n=0; n<8; ++n)
{
   Worker* worker = new Worker(this, fileList);  // create worker and
   worker->start(); // starts own thread etc.

   connect(this, &MyClass::newFileAvailable,     // connect it to new
           worker, &MyWorker::processNewFile);   // file signal
}

Worker class:

// the worker thread supposed to be running in own thread
// see QObject::moveToThread()
class MyWorker : public QObject
{
   public:
      MyWorker(QObject* parent, FilePathQueue& list) :
         QObject(parent), fileList(list) {}
   ///
   public slots:
      void processNewFile()
      {
          QString path = fileList.pop();
          if (path.isEmpty()) return;

          process(path);
      }

      void process(const QString& path);
   private:
      FilePathQueue& fileList;
};

This is done simple with Qt and very manageable with, say, thousands file arrival signals per second per several workers, but for the higher rate with more consumers or more of real-time performance system QWaitCondition::wait() should be exploited on worker thread side while the publisher will do QWaitCondition::notify_one() or notify_all() depending on the logic. To implement that logic the more elaborated worker thread example needs to be presented.

Upvotes: 2

Related Questions