NullAchtFuffZehn
NullAchtFuffZehn

Reputation: 88

QtConcurrent::map shows no benefit

I want to manipulate a QVector using the QtConcurrent::map function. All my sample program does is to increment all values in a QVector by 1.

QVector<double> arr(10000000, 0);
QElapsedTimer timer;
qDebug() << QThreadPool::globalInstance()->maxThreadCount() << "Threads";

int end;
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
timer.start();
for(int i = 0; i < 100; ++i) {
    std::transform(arr.begin(), arr.end(), arr.begin(), [](double x){ return ++x; });
}
end = timer.elapsed();
qDebug() << end;
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
timer.start();
for(int i = 0; i < 100; ++i) {
    std::for_each(arr.begin(), arr.end(), [](double &x){ ++x; });
}
end = timer.elapsed();
qDebug() << end;
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
timer.start();
for(int i = 0; i < 100; ++i) {
    QFuture<void> qf = QtConcurrent::map(arr.begin(), arr.end(), [](double &x){ ++x; });
    qf.waitForFinished();
}
end = timer.elapsed();
qDebug() << end;

However the program outputs

4 Threads
905 // std::transform
886 // std::for_each
876 // QtConcurrent::map

so there is almost no speed benefit with the multithreaded version. I verified that there are actually 4 threads running. I used -O2 optimization. Is the more common QThreadPool approach better suited for this situation?

EDIT:

I tried a differernt method using QtConcurrent::run(). Here are the relevant parts of the program code:

void add1(QVector<double>::iterator first, QVector<double>::iterator last) {
    for(; first != last; ++first) {
        *first += 1;
    }
}

/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
std::for_each(arr.begin(), arr.end(), [](double &x){ ++x; });
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
QFuture<void> qf[numThreads];
for(int j = 0; j < numThreads; ++j) {
    qf[j] = QtConcurrent::run(add1, arr.begin()+j*n/numThreads, arr.begin()+(j+1)*n/numThreads-1);
}
for(int j = 0; j < numThreads; ++j) {
    qf[j].waitForFinished();
}

So I manually distribute the task over different threads. But still I hardly get a performance boost:

181 ms // std::for_each
163 ms // QtConcurrent::run

What's still wrong here?

Upvotes: 3

Views: 757

Answers (1)

Yakk - Adam Nevraumont
Yakk - Adam Nevraumont

Reputation: 275555

It appears QtConcurrent has a high overhead compared to simply using C++ threading primitives and roll-your-own-thread-pools.

template<class T>
struct threaded_queue {
  using lock = std::unique_lock<std::mutex>;
  void push_back( T t ) {
    {
      lock l(m);
      data.push_back(std::move(t));
    }
    cv.notify_one();
  }
  boost::optional<T> pop_front() {
    lock l(m);
    cv.wait(l, [this]{ return abort || !data.empty(); } );
    if (abort) return {};
    auto r = std::move(data.back());
    data.pop_back();
    return std::move(r);
  }
  void terminate() {
    {
      lock l(m);
      abort = true;
      data.clear();
    }
    cv.notify_all();
  }
  ~threaded_queue()
  {
    terminate();
  }
private:
  std::mutex m;
  std::deque<T> data;
  std::condition_variable cv;
  bool abort = false;
};
struct thread_pool {
  thread_pool( std::size_t n = 1 ) { start_thread(n); }
  thread_pool( thread_pool&& ) = delete;
  thread_pool& operator=( thread_pool&& ) = delete;
  ~thread_pool() = default; // or `{ terminate(); }` if you want to abandon some tasks
  template<class F, class R=std::result_of_t<F&()>>
  std::future<R> queue_task( F task ) {
    std::packaged_task<R()> p(std::move(task));
    auto r = p.get_future();
    tasks.push_back( std::move(p) );
    return r;
  }
  template<class F, class R=std::result_of_t<F&()>>
  std::future<R> run_task( F task ) {
    if (threads_active() >= total_threads()) {
      start_thread();
    }
    return queue_task( std::move(task) );
  }
  void terminate() {
    tasks.terminate();
  }
  std::size_t threads_active() const {
    return active;
  }
  std::size_t total_threads() const {
    return threads.size();
  }
  void clear_threads() {
    terminate();
    threads.clear();
  }
  void start_thread( std::size_t n = 1 ) {
    while(n-->0) {
      threads.push_back(
        std::async( std::launch::async,
          [this]{
            while(auto task = tasks.pop_front()) {
              ++active;
              try{
                (*task)();
              } catch(...) {
                --active;
                throw;
              }
              --active;
            }
          }
        )
      );
    }
  }
private:
  std::vector<std::future<void>> threads;
  threaded_queue<std::packaged_task<void()>> tasks;
  std::atomic<std::size_t> active = {};
};

struct my_timer_t {
    std::chrono::high_resolution_clock::time_point first;
    std::chrono::high_resolution_clock::duration duration;

    void start() {
        first = std::chrono::high_resolution_clock::now();
    }
    std::chrono::high_resolution_clock::duration finish() {
        return duration = std::chrono::high_resolution_clock::now()-first;
    }
    unsigned long long ms() const {
        return std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
    }
};
int main() {
    std::vector<double> arr(1000000, 0);
    my_timer_t timer;

    /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
    timer.start();
    for(int i = 0; i < 100; ++i) {
        std::transform(arr.begin(), arr.end(), arr.begin(), [](double x){ return ++x; });
    }
    timer.finish();
    auto time_transform = timer.ms();
    std::cout << time_transform << "<- std::transform (" << arr[rand()%arr.size()] << ")\n";
    /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
    timer.start();
    for(int i = 0; i < 100; ++i) {
        std::for_each(arr.begin(), arr.end(), [](double &x){ ++x; });
    }
    timer.finish();
    auto time_for_each = timer.ms();
    std::cout << time_for_each << "<- std::for_each (" << arr[rand()%arr.size()] << ")\n";
    /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
    enum { num_threads = 8 };
    thread_pool pool(num_threads);
    timer.start();
    for(int i = 0; i < 100; ++i) {
        std::array< std::future<void>, num_threads > tasks;
        for (int t = 0; t < num_threads; ++t) {
            tasks[t] = pool.run_task([&,t]{
                std::for_each( arr.begin()+(arr.size()/num_threads)*t, arr.begin()+(arr.size()/num_threads)*(t+1), [](double& x){++x;} );
            });
        }
        // std::cout << "loop! -- " << pool.threads_active() << "/" << pool.total_threads() << std::endl;
        for (int t = 0; t < num_threads; ++t)
            tasks[t].wait();
    }
    timer.finish();
    auto time_pool = timer.ms();
    std::cout << time_pool << "<- thread_pool (" << arr[rand()%arr.size()] << ")\n";
}

Live example.

This generates:

153<- std::transform (100)
131<- std::for_each (200)
82<- thread_pool (300)

a significant speedup when using a simple C++11 thread pool to split tasks 8 ways. (It was about 105 when splitting tasks 4 ways).

Now I did use a test set 10 times smaller than yours, as the online system timed out when my program took that long to run.

There is going to be overhead communicating with your thread pool system, but my naive thread pool shouldn't be outperforming a real library like this.

Now, a serious problem is that you might be memory IO-bound; more threads accessing bytes faster won't help if you all have to wait for the bytes.

Upvotes: 4

Related Questions