Reputation: 60341
I'm new with std::thread and I try to code a parallel_for
.
I coded the following thing:
// parallel_for.cpp
// compilation: g++ -O3 -std=c++0x parallel_for.cpp -o parallel_for -lpthread
// execution: time ./parallel_for 100 50000000
// (100: number of threads, 50000000: vector size)
#include <iostream>
#include <iomanip>
#include <cstdlib>
#include <vector>
#include <thread>
#include <cmath>
#include <algorithm>
#include <numeric>
#include <utility>
// Parallel for
template<typename Iterator, class Function>
void parallel_for(const Iterator& first, const Iterator& last, Function&& f, const int nthreads = 1, const int threshold = 1000)
{
const unsigned int group = std::max(std::max(1, std::abs(threshold)), (last-first)/std::abs(nthreads));
std::vector<std::thread> threads;
for (Iterator it = first; it < last; it += group) {
threads.push_back(std::thread([=](){std::for_each(it, std::min(it+group, last), f);}));
}
std::for_each(threads.begin(), threads.end(), [=](std::thread& x){x.join();});
}
// Function to apply
template<typename Type>
void f1(Type& x)
{
x = std::sin(x)+std::exp(std::cos(x))/std::exp(std::sin(x));
}
// Main
int main(int argc, char* argv[]) {
const unsigned int nthreads = (argc > 1) ? std::atol(argv[1]) : (1);
const unsigned int n = (argc > 2) ? std::atol(argv[2]) : (100000000);
double x = 0;
std::vector<double> v(n);
std::iota(v.begin(), v.end(), 0);
parallel_for(v.begin(), v.end(), f1<double>, nthreads);
for (unsigned int i = 0; i < n; ++i) x += v[i];
std::cout<<std::setprecision(15)<<x<<std::endl;
return 0;
}
But this is not working: (error codes from g++ 4.6)
parallel_for.cpp: In instantiation of ‘parallel_for(const Iterator&, const Iterator&, Function&&, int, int) [with Iterator = __gnu_cxx::__normal_iterator<double*, std::vector<double> >, Function = void (&)(double&)]::<lambda()>’:
parallel_for.cpp:22:9: instantiated from ‘void parallel_for(const Iterator&, const Iterator&, Function&&, int, int) [with Iterator = __gnu_cxx::__normal_iterator<double*, std::vector<double> >, Function = void (&)(double&)]’
parallel_for.cpp:43:58: instantiated from here
parallel_for.cpp:22:89: erreur: field ‘parallel_for(const Iterator&, const Iterator&, Function&&, int, int) [with Iterator = __gnu_cxx::__normal_iterator<double*, std::vector<double> >, Function = void (&)(double&)]::<lambda()>::__f’ invalidly declared function type
How to solve this problem ?
EDIT: This new version compiles but does not give the good result:
// parallel_for.cpp
// compilation: g++ -O3 -std=c++0x parallel_for.cpp -o parallel_for -lpthread
// execution: time ./parallel_for 100 50000000
// (100: number of threads, 50000000: vector size)
#include <iostream>
#include <iomanip>
#include <cstdlib>
#include <vector>
#include <thread>
#include <cmath>
#include <algorithm>
#include <numeric>
#include <utility>
// Parallel for
template<typename Iterator, class Function>
void parallel_for(const Iterator& first, const Iterator& last, Function&& f, const int nthreads = 1, const int threshold = 1000)
{
const unsigned int group = std::max(std::max(1, std::abs(threshold)), (last-first)/std::abs(nthreads));
std::vector<std::thread> threads;
for (Iterator it = first; it < last; it += group) {
threads.push_back(std::thread([=, &f](){std::for_each(it, std::min(it+group, last), f);}));
}
std::for_each(threads.begin(), threads.end(), [](std::thread& x){x.join();});
}
// Function to apply
template<typename Type>
void f(Type& x)
{
x = std::sin(x)+std::exp(std::cos(x))/std::exp(std::sin(x));
}
// Main
int main(int argc, char* argv[]) {
const unsigned int nthreads = (argc > 1) ? std::atol(argv[1]) : (1);
const unsigned int n = (argc > 2) ? std::atol(argv[2]) : (100000000);
double x = 0;
double y = 0;
std::vector<double> v(n);
std::iota(v.begin(), v.end(), 0);
std::for_each(v.begin(), v.end(), f<double>);
for (unsigned int i = 0; i < n; ++i) x += v[i];
std::iota(v.begin(), v.end(), 0);
parallel_for(v.begin(), v.end(), f<double>, nthreads);
for (unsigned int i = 0; i < n; ++i) y += v[i];
std::cout<<std::setprecision(15)<<x<<" "<<y<<std::endl;
return 0;
}
The result is :
./parallel_for 1 100
155.524339894552 4950
The parallel version returns 4950 whereas the sequential version returns 155..... Where is the problem ?
Upvotes: 5
Views: 8408
Reputation: 515
vc11 solution, please let me know if it´s not work with gcc.
template<typename Iterator, class Function>
void parallel_for( const Iterator& first, const Iterator& last, Function&& f, const size_t nthreads = std::thread::hardware_concurrency(), const size_t threshold = 1 )
{
const size_t portion = std::max( threshold, (last-first) / nthreads );
std::vector<std::thread> threads;
for ( Iterator it = first; it < last; it += portion )
{
Iterator begin = it;
Iterator end = it + portion;
if ( end > last )
end = last;
threads.push_back( std::thread( [=,&f]() {
for ( Iterator i = begin; i != end; ++i )
f(i);
}));
}
std::for_each(threads.begin(), threads.end(), [](std::thread& x){x.join();});
}
Upvotes: 0
Reputation: 33651
You must capture functions by reference.
[=, &f] () { /* your code */ };
Look at the code.
#include <iostream>
template <class T>
void foo(const T& t)
{
const int a = t;
[&]
{
std::cout << a << std::endl;
}();
}
int main()
{
foo(42);
return 0;
}
The clang gives output 42
, but g++ throws warning: ‘a’ is used uninitialized in this function
, and prints 0
. Looks like a bug.
Workaround: use const auto
(for variable group
in your code).
UPD: I think, that's it. http://gcc.gnu.org/bugzilla/show_bug.cgi?id=52026
Upvotes: 1
Reputation: 27038
You need a cast or type conversion at (last-first). The reason is that type conversions are never done during template argument deduction.
This works just fine (also fixing the problem DeadMG and Ben Voigt found). Both versions give 156608294.151782 with n=100000000.
template<typename Iterator, class Function>
void parallel_for(const Iterator& first, const Iterator& last, Function&& f, const int nthreads = 1, const int threshold = 1000)
{
const unsigned int group = std::max(std::max(ptrdiff_t(1), ptrdiff_t(std::abs(threshold))), ((last-first))/std::abs(nthreads));
std::vector<std::thread> threads;
threads.reserve(nthreads);
Iterator it = first;
for (; it < last-group; it += group) {
threads.push_back(std::thread([=,&f](){std::for_each(it, std::min(it+group, last), f);}));
}
std::for_each(it, last, f); // last steps while we wait for other threads
std::for_each(threads.begin(), threads.end(), [](std::thread& x){x.join();});
}
Since the step for_each(it, last, f)
is smaller than the others, we may as well use the calling thread to finish that while waiting for the other results.
Upvotes: 5
Reputation: 27038
You need to capture by reference, and you need a cast or type conversion at (last-first). The reason is that type conversions are never done during template argument deduction.
Also, fix the problem DeadMG found, and you end up with the code below.
It works just fine, both versions give 156608294.151782 with n=100000000.
template<typename Iterator, class Function>
void parallel_for(const Iterator& first, const Iterator& last, Function&& f, const int nthreads = 1, const int threshold = 1000)
{
const unsigned int group = std::max(std::max(ptrdiff_t(1), ptrdiff_t(std::abs(threshold))), ((last-first))/std::abs(nthreads));
std::vector<std::thread> threads;
Iterator it = first;
for (; it < last-group; it += group) {
threads.push_back(std::thread([=,&f](){std::for_each(it, std::min(it+group, last), f);}));
}
std::for_each(it, last, f); // use calling thread while we wait for the others
std::for_each(threads.begin(), threads.end(), [](std::thread& x){x.join();});
}
Upvotes: 0
Reputation: 283624
One problem is that it += group
can be last
legally, but creating a value off the end is undefined behavior. Merely checking it < last
is too late to fix that.
You need to instead test last - it
while it
is still valid. (Neither it + group
nor last - group
is certain to be safe, although the latter should be due to the way group
is calculated.)
For example:
template<typename Iterator, class Function>
void parallel_for(const Iterator& first, const Iterator& last, Function f, const int nthreads = 1, const int threshold = 100)
{
const unsigned int group = std::max(std::max(1, std::abs(threshold)), (last-first)/std::abs(nthreads));
std::vector<std::thread> threads;
threads.reserve(nthreads);
Iterator it = first;
for (; last - it > group; it += group) {
threads.push_back(std::thread([=, &f](){std::for_each(it, it+group, last), f);}));
}
threads.push_back(std::thread([=, &f](){std::for_each(it, last, f);}));
std::for_each(threads.begin(), threads.end(), [](std::thread& x){x.join();});
}
Upvotes: 1
Reputation: 146910
You give std::min(it+group, last)
to std::for_each
, but always add group
on to the end. This means that if last
is not a multiple of group
on from it
, you will move it
past last
, which is UB.
Upvotes: 0