GPrathap
GPrathap

Reputation: 7810

What is the correct way of constructing multiple threads to get best performance?

I want to know when using a lambda expression to define the thread, will it help to improve some performance gain. In my case, I have to run several threads. This is for real-time based application. Hence, if someone suggests to me what would be the optimal way of creating several threads. Creating threads happens in each iteration in the actual codebase. This is an example of what is happening in one iteration in high-level. Thus, this is a kind of expensive operation which is to be optimized.

  #include <iostream>
  #include <thread>
  #include <vector>
  #include <algorithm>

  class Task
  {
    public:
    void execute(std::string command)
    {
      //TODO actual logic
      for(int i = 0; i < 5; i++)
      {
        std::cout<<command<<std::endl;
      }
    }
  };

  int main()
  {          
      Task* taskPtr = new Task();
      std::vector<std::thread> workers_older;
      for (int i = 0; i < 2; i++) {
          workers_older.push_back(std::thread(&Task::execute, taskPtr, "Task: without lambda expression"+ std::to_string(i)));
      }
      std::for_each(workers_older.begin(), workers_older.end(), [](std::thread &t) 
      {
          t.join();
      });

      std::vector<std::thread> workers;
      for (int i = 0; i < 2; i++) {
          workers.push_back(std::thread([&]() 
          {
              taskPtr->execute("Task: "+ std::to_string(i));
          }));
      }
      std::for_each(workers.begin(), workers.end(), [](std::thread &t) 
      {
          t.join();
      });
      return 0;
  }

EDIT: After the valuable comments about what is to be done, I have provided as an answer as they suggested

Upvotes: 1

Views: 431

Answers (3)

GPrathap
GPrathap

Reputation: 7810

Thank you so much for everybody all the very valuable thoughts. I have decided to use a thread pool for the task. I am sorry, regarding not explained whole logic which is quite long and I thought it was not required.

Here is my proposed solution. I got initial code from here and modified as the way I wanted.

    #include <iostream>
    #include <unistd.h>
    #include <iostream>
    #include <thread>
    #include <vector>
    #include <algorithm>
    #include <boost/shared_ptr.hpp>
    #include <boost/make_shared.hpp>

    #include <boost/thread.hpp>
    #include <boost/bind.hpp>
    #include <boost/asio.hpp>
    #include <boost/move/move.hpp>
    #include <boost/make_unique.hpp>

    namespace asio = boost::asio; 

    typedef boost::packaged_task<int> task_t;
    typedef boost::shared_ptr<task_t> ptask_t;

    class Task
    {
    public:
    int execute(std::string command)
    {
      //TODO actual logic
      std::cout<< "\nThread:" << command << std::endl;
      int sum = 0;
      for(int i = 0; i < 5; i++)
      {
        sum+=i;
      }
      return sum;
    }
  };


    void push_job(Task* worker, std::string seconds, boost::asio::io_service& io_service
                , std::vector<boost::shared_future<int> >& pending_data) {
      ptask_t task = boost::make_shared<task_t>(boost::bind(&Task::execute, worker, seconds));
      boost::shared_future<int> fut(task->get_future());
      pending_data.push_back(fut);
      io_service.post(boost::bind(&task_t::operator(), task));
    }

    int main()
    {
        Task* taskPtr = new Task();

        boost::asio::io_service io_service;
        boost::thread_group threads;
        std::unique_ptr<boost::asio::io_service::work> service_work;
        service_work = boost::make_unique<boost::asio::io_service::work>(io_service);
        for (int i = 0; i < boost::thread::hardware_concurrency() ; ++i)
        {
          threads.create_thread(boost::bind(&boost::asio::io_service::run,
            &io_service));
        }
        std::vector<boost::shared_future<int> > pending_data; // vector of futures

        push_job(taskPtr, "4", io_service, pending_data);
        push_job(taskPtr, "5", io_service, pending_data);
        push_job(taskPtr, "6", io_service, pending_data);
        push_job(taskPtr, "7", io_service, pending_data);

        boost::wait_for_all(pending_data.begin(), pending_data.end());
        int total_sum = 0;
        for(auto result : pending_data){
           total_sum += result.get();
        }
        std::cout<< "Total sum: "<< total_sum << std::endl;
        return 0;
    }

Upvotes: 0

rustyx
rustyx

Reputation: 85351

The biggest overhead when working with threads comes from starting a thread, scheduling, context switching and cache utilization. The overhead of an additional indirection of a function pointer would be negligible compared to that.

Here are some points to keep in mind for optimal performance:

  • Keep a pool of N threads, where N = std::thread::hardware_concurrency() (the number of logical processors in the system)
  • Submit N-1 jobs to the pool, and run the Nth job in the calling thread. The savings from not submitting the Nth job to the pool can be significant
  • Avoid false sharing. Data written by different threads should be in different cache lines
  • More active threads often means a larger working set. So D-cache utilization may decrease, impacting performance

Here's my working example:

#include <iostream>
#include <memory>
#include <thread>
#include <vector>
#include <boost/asio.hpp>

struct thread_pool {
    thread_pool(int threads = std::thread::hardware_concurrency()) : size(threads) {
        grp.reserve(threads);
        for (int i = 0; i < threads; ++i)
            grp.emplace_back([this] { return service.run(); });
    }

    template<typename F, typename ...Args>
    auto enqueue(F& f, Args... args) -> std::future<decltype(f(args...))> {
        return boost::asio::post(service,
            std::packaged_task<decltype(f(args...))()>([&f, args...]{ return f(args...); })
        );
    }

    ~thread_pool() {
        service_work.reset();
        for (auto &t : grp)
            if (t.joinable())
                t.join();
        service.stop();
    }

    const int size;
private:
    boost::asio::io_service service;
    std::unique_ptr<boost::asio::io_service::work> service_work {new boost::asio::io_service::work(service)};
    std::vector<std::thread> grp;
};

int main() {
    thread_pool pool;
    std::vector<std::future<int>> results;
    auto task = [](int i) { return i + 1; };
    for (int i = 0; i < pool.size - 1; i++) {
        results.emplace_back(pool.enqueue(task, i));
    }
    int sum = task(pool.size - 1); // last task run synchronously
    for (auto& res : results) {
        sum += res.get();
    }
    std::cout << sum << std::endl;
}

Upvotes: 1

Anthony Williams
Anthony Williams

Reputation: 68591

There will be very little difference in overhead between passing the address of a member function and a set of parameters to the std::thread constructor vs passing a lambda function with appropriate captures.

The big overhead in the std::thread constructor is actually starting the thread itself.

If you know that you are going to want the same number of worker threads at multiple places in your program, it might be worth keeping them around as long-running threads with a queue of tasks.

Upvotes: 3

Related Questions