Reputation: 93
So I was looking up how to do some parallelism just using stl c++ stuff and found the following bit of code on another question here in Stack Overflow
template <typename RAIter> //FOUND ON STACK OVERFLOW
int parallel_sum(RAIter beg, RAIter end)
{
auto len = end - beg;
if (len < 1000)
return std::accumulate(beg, end, 0);
RAIter mid = beg + len / 2;
auto handle = std::async(std::launch::async,
parallel_sum<RAIter>, mid, end);
int sum = parallel_sum(beg, mid);
return sum + handle.get();
}
I wanted to make a general parallel_for_each function that loops over a (hopefully) arbitrary container type and applies an algorithm to each entry so I modified the above to the following:
template <typename ContainerIterator, typename containerSizeType, typename AlgorithmPerEntry> //modified version of parallel sum code above : https://stackoverflow.com/questions/36246300/parallel-loops-in-c
void parallel_for_each(ContainerIterator beg, ContainerIterator end, AlgorithmPerEntry& algorithm, containerSizeType maxProbSize)
{
containerSizeType len = end - beg;
if (len < maxProbSize){//if you are sufficiently small, go ahead and execute
std::for_each(beg, end, algorithm);
std::cout << "working on processor with id = " << GetCurrentProcessorNumber() << std::endl;//the processor id's change so I'm assuming this is executing in parallel
return;
}
//otherwise, continue spawning more threads
ContainerIterator mid = beg + len / 2;
auto handle = std::async(std::launch::async,
parallel_for_each<ContainerIterator, containerSizeType, AlgorithmPerEntry>, mid, end, algorithm, maxProbSize);
parallel_for_each(beg, mid, algorithm, maxProbSize);
handle.get(); //corrected as advised
}
I wanted to test is with a super simple functor so I made the following:
template<typename T>
struct dataSetter
{
const T& set_to;
dataSetter(const T& set_to_in) : set_to(set_to_in){}
void operator()(T& set_this)
{
set_this = set_to;
}
};
Pretty straight forward, just sets the value of some arg into its operator()
Here's my main function's body
std::vector<int> ints(100000);
unsigned minProbSize = 1000;
int setval = 7;
dataSetter<int> setter(setval);
parallel_for_each(ints.begin(), ints.end(), setter, minProbSize);//parallel assign everything to 7
//some sort of wait function to go here?
std::cout << std::endl << "PS sum of all ints = " << parallel_sum(ints.begin(), ints.end()) << std::endl; //parallel sum the entries
int total = 0;//serial sum the entries
for (unsigned i = 0; i < ints.size(); i++)
total += ints[i];
std::cout << std::endl << "S sum of all ints = " << total << std::endl;
std::cout << std::endl << "PS sum of all ints = " << parallel_sum(ints.begin(), ints.end()) << std::endl; //parallel sum the entries again
Here are some outputs :
PS sum of all ints = 689052
S sum of all ints = 700000
PS sum of all ints = 700000
output from another run:
PS sum of all ints = 514024
S sum of all ints = 700000
PS sum of all ints = 700000
It consistently gets the first parallel sum over the vector low. My guess as to what is happening is that all the assignment threads get created, then the summing threads get created, but certain sum threads are executing prematurely (before the last assignment thread). Is there any way I can force a wait? And as always, I'm open to all advice.
Upvotes: 2
Views: 1290
Reputation: 275385
MSVS 2013 shipped with a non-standard compliant std async. From what I heard this was intentional.
This non-compliant std async failed to return futures that block on task completion when invoked with an async launch policy.
The result is your code is correct, but your compiler is broken. Either upgrade to 2015/2017 or add a handle.get()
just before it goes out of scope.
Personally I would write such a utility a bit differently.
auto launch_async = [](auto&& f){
return std::async( std::launch::async, decltype(f)(f) );
};
template <class Linear, class Executor=decltype(launch_async) const&>
auto parallel_algo(Linear&& linear, std::size_t chunk_size, Executor&& exec=launch_async){
return
[
linear=std::forward<Linear>(linear),
chunk_size,
exec=std::forward<Executor>(exec)
]
( auto start, auto finish )
{
std::size_t count = finish-start;
if (count <= chunk_size) {
linear( start, finish);
return;
}
std::size_t par = (count+chunk_size-1)/chunk_size;
std::vector<std::future<void>> tasks( par-1 );
auto task=[&]( auto i ){
auto b = start+( count*i/par );
auto e = start+( count*(i+1)/par );
return [b,e,linear]{ linear(b,e); };
};
for(auto& f:tasks){
auto i = &f-tasks.data();
f = exec( task(i) );
}
task(par-1)();
for (auto&f:tasks) f.get();
};
}
template<class F>
auto foreacher( F&&f ){
return [f=std::forward<F>(f)]( auto b, auto e ){
for (auto i=b; i!=e; ++i) f(*i);
};
}
which is C++14 but can be emulated in C++11.
What this does is take a linear algorithm and a max chunk size, return a parallel algorithm.
In both cases, algorithms take an iterator range.
While I was in there, the parallel algorithm factory takes an executor. You can write an exectuor, for example a thread pool, to avoid generating too many threads needlessly. And single element algorithms use foreacher to lift themselves to range algorithms.
Some toy executors:
auto launch_deferred = [](auto&& f){
return std:async( std::launch::deferred, decltype(f)(f) );
};
template<class F, class R=std::result_of_t< decltype(f)() >>
std::enable_if_t< !std::is_same<R,void>, std::future<R> >
make_ready_future( F&& f ) {
std::promise<R> p;
try {
p.set_value( decltype(f)(f)() );
} catch( ... ) {
p.set_exception(std::current_exception());
}
return p.get_future();
}
template<class F, class R=std::result_of_t< decltype(f)() >>
std::enable_if_t< std::is_same<R,void>, std::future<R> >
make_ready_future( F&& f ) {
std::promise<void> p;
try {
decltype(f)(f)();
p.set_value();
} catch( ... ) {
p.set_exception(std::current_exception());
}
return p.get_future();
}
auto launch_ready = [](auto&& f){
return make_ready_future( decltype(f)(f) );
};
Both of these make your parallel code run in a single thread.
A fancier one queues tasks in a thread pool and similarly returns futures.
Here is test code:
std::vector<int> v(10000);
parallel_algo( foreacher([](auto&x){x=7;}), 100 )( v.begin(), v.end() );
std::atomic<int> total(0);
parallel_algo( foreacher([&total](auto&x){total+=x;}), 100 )( v.begin(), v.end() );
std::cout << total << "\n";
Upvotes: 1