Reputation: 15337
I am using std::thread
and gcc as my compiler in implementing the parallel-merge as described in Cormen's Introduction to Algorithms.
I think I got the code to work. It passes all randomly seeded arrays that are not too big. However, when I try to merge two arrays that are large (1e6 elements each), I get the following termination:
terminate called without an active exception
terminate called recursively
terminate called recursively
Using gdb doesn't help: it becomes corrupted during the run.
I am pretty certain that the run has failed due to too many threads spawned.
What can I do to confirm that this error is due to too many std::threads spawned?
NOTES
UPDATE
CODE
#include <vector>
#include <iostream>
#include <algorithm>
#include <vector>
#include <thread>
#include <cmath>
#include <cstring>
#include <cassert>
#define STRBUF_SIZE 1024
class Random
{
public:
Random( unsigned int seed=::time(nullptr))
: m_seed( seed )
{ }
// between [ 0 .. n-1 ]
unsigned int rand_uint( unsigned int n )
{
return static_cast<unsigned int>
(static_cast<float>(n) * rand_r( &m_seed ) / RAND_MAX);
}
unsigned int getSeed() const { return m_seed; }
private:
unsigned int m_seed;
};
template<typename T>
char* dump( char* line, T it1, T it2 )
{
char buf[80];
line[0] = '\0';
for( T it=it1; it!=it2; ++it )
{
sprintf( buf, "%u ", *it );
strcat( line, buf );
}
return line;
}
template< typename T, class It >
It binary_search_it( It beg, It end, const T& value )
{
auto low = beg;
auto high = std::max( beg, end ); // end+1
while( low < high )
{
auto mid = low + std::distance( low, high ) / 2;
if ( value <= *mid )
high = mid;
else
low = mid + 1;
}
return high;
}
template< class InputIt, class OutputIt >
void p_merge(
char const* msg,
unsigned depth,
unsigned parent_lvl_id,
unsigned lr,
InputIt p1, InputIt r1,
InputIt p2, InputIt r2,
OutputIt p3, OutputIt r3
)
{
#ifdef DEBUG
char buff[STRBUF_SIZE];
#endif
unsigned sum_prev = pow( 2, depth ) - 1;
unsigned lvl_id = 2*parent_lvl_id + lr;
unsigned thread_no = sum_prev + lvl_id + 1;
unsigned limit0 = sum_prev + 1;
unsigned limit1 = pow( 2, depth+1 ) - 1;
#ifdef DEBUG
char msg_dep[256];
sprintf( msg_dep, "%s [%2d] %-10d [%d,%d]", msg, depth, thread_no, limit0, limit1 );
fprintf( stderr, "%s\n", msg_dep );
#endif
if ( thread_no<limit0 || thread_no>limit1 )
{
fprintf( stderr, "OUT OF BOUNDS\n" );
exit( 1 );
}
auto n1 = std::distance( p1, r1 );
auto n2 = std::distance( p2, r2 );
#ifdef DEBUG
fprintf( stderr, "%s dist[v1]=%2ld : %s\n", msg_dep, n1, dump( buff, p1, r1 ) );
fprintf( stderr, "%s dist[v2]=%2ld : %s\n", msg_dep, n2, dump( buff, p2, r2 ) );
#endif
if ( n1<n2 )
{
std::swap( p1, p2 );
std::swap( r1, r2 );
std::swap( n1, n2 );
#ifdef DEBUG
fprintf( stderr, "%s swapped[v1] : %s\n", msg_dep, dump( buff, p1, r1 ));
fprintf( stderr, "%s swapped[v2] : %s\n", msg_dep, dump( buff, p2, r2 ));
#endif
}
if ( n1==0 )
{
#ifdef DEBUG
fprintf( stderr, "%s done \n", msg_dep );
#endif
return;
}
auto q1 = p1 + n1 / 2; // midpoint
auto q2 = binary_search_it( p2, r2, *q1 ); // <q1 q2[q1] >=q1
auto q3 = p3 + std::distance( p1, q1 ) + std::distance( p2, q2 );
*q3 = *q1;
#ifdef DEBUG
fprintf( stderr, "%s q1[median]=%u : %s\n", msg_dep, *q1, dump( buff, p1, r1 ));
fprintf( stderr, "%s q2[fulcrum]=%u : %s\n", msg_dep, *q2, dump( buff, p2, r2 ));
fprintf( stderr, "%s q3(copied)=%u : %s\n", msg_dep, *q3, dump( buff, p3, r3 ));
#endif
#ifdef DEBUG
auto d1 = std::distance( p1, q1-1 );
auto d2 = std::distance( q1+1, r1 );
fprintf( stderr, "%s q1[dist_L]=%ld : %s\n", msg_dep, d1, dump( buff, p1, r1 ));
fprintf( stderr, "%s q1[dist_M]=%ld : %s\n", msg_dep, d2, dump( buff, p1, r1 ));
#endif
try {
std::thread t1{
[&](){ p_merge( "LESS", depth+1, lvl_id, 0, p1, q1, p2, q2, p3, r3 ); }
};
std::thread t2{
[&](){ p_merge( "MORE", depth+1, lvl_id, 1, q1+1, r1, q2, r2, q3+1, r3 ); }
};
t1.join();
t2.join();
}
catch( ... )
{
fprintf( stderr, "OK - I am dying during a std::thread spawn\n" );
exit( 1 );
}
#ifdef DEBUG
fprintf( stderr, "%s synchronized\n", msg_dep );
#endif
}
int
main( int argv, char* argc[] )
{
// ok up to 1e4, fails by 1e5
unsigned n = 1e5;
Random r;
std::vector<unsigned> v1( n ), v2( n ), v3( 2 * n );
#ifdef DEBUG
fprintf( stderr, "SEED = %u\n", r.getSeed() );
#endif
std::generate( v1.begin(), v1.end(), [&]() { return r.rand_uint(n); } );
std::generate( v2.begin(), v2.end(), [&]() { return r.rand_uint(n); } );
#ifdef DEBUG
char buff[STRBUF_SIZE];
fprintf( stderr, "%s\n", dump( buff, v1.begin(), v1.end() ));
fprintf( stderr, "%s\n", dump( buff, v2.begin(), v2.end() ));
#endif
std::sort( v1.begin(), v1.end() );
std::sort( v2.begin(), v2.end() );
p_merge( "TOP ", 0, 0, 0,
v1.begin(), v1.end(), v2.begin(), v2.end(), v3.begin(), v3.end() );
assert( std::is_sorted( v3.begin(), v3.end() ));
#ifdef DEBUG
fprintf( stderr, "FINAL : %s\n", dump( buff, v3.begin(), v3.end() ));
#endif
}
Upvotes: 4
Views: 842
Reputation: 504333
You can catch std::system_error
and check if the code is resource_unavailable_try_again
:
#include <atomic>
#include <iostream>
#include <system_error>
#include <thread>
#include <vector>
class thread_collection
{
public:
thread_collection() :
mStop(false)
{}
~thread_collection()
{
clear();
}
template <typename Func, typename... Args>
bool add(Func&& func, Args&&... args)
{
try
{
mThreads.emplace_back(std::forward<Func>(func),
std::cref(mStop),
std::forward<Args>(args)...);
}
catch (const std::system_error& e)
{
if (e.code().value() == std::errc::resource_unavailable_try_again)
return false; // not possible to make more threads right now
else
throw; // something else
}
return true; // can keep going
}
void clear()
{
mStop = true;
for (auto& thread : mThreads)
{
if (thread.joinable())
thread.join();
}
mThreads.clear();
mStop = true;
}
std::size_t size() const
{
return mThreads.size();
}
private:
thread_collection(const thread_collection&);
thread_collection& operator=(const thread_collection&);
std::atomic<bool> mStop;
std::vector<std::thread> mThreads;
};
void worker(const std::atomic<bool>& stop)
{
while (!stop)
std::this_thread::yield();
}
int main()
{
thread_collection threads;
try
{
while (threads.add(worker))
continue;
std::cout << "Exhausted thread resources!" << std::endl;
}
catch (const std::exception& e)
{
std::cout << "Stopped for some other reason: " << e.what() << std::endl;
}
std::cout << "Made: " << threads.size() << " threads." << std::endl;
threads.clear();
}
(Run this at your own risk!)
According §30.3.1.2/4, this is the error code used to indicate thread creation failure:
Error conditions:
resource_unavailable_try_again — the system lacked the necessary resources to create another thread, or the system-imposed limit on the number of threads in a process would be exceeded.
Note this could be thrown by your own arguments being copied to the resulting thread. To guarantee against this, you need to pre-construct your arguments, then no-throw move them to your thread function.
That said, you're much better off putting a limit on thread creation anyway. There's no point in having more threads running than cores can execute. Use std::thread::hardware_concurrency
to get that number.
Upvotes: 5
Reputation: 171501
try {
std::thread t1{
[&](){ p_merge( "LESS", depth+1, lvl_id, 0, p1, q1, p2, q2, p3, r3 ); }
};
std::thread t2{
[&](){ p_merge( "MORE", depth+1, lvl_id, 1, q1+1, r1, q2, r2, q3+1, r3 ); }
};
t1.join();
t2.join();
}
catch( ... )
{
fprintf( stderr, "OK - I am dying during a std::thread spawn\n" );
exit( 1 );
}
This code may not do what you expect. If constructing t2
throws an exception then t1
will be destroyed but because that thread is joinable it will call std::terminate()
so your catch
will not handle the exception.
One reason you're seeing terminate called recursively
could be that lots of threads have that same problem at once, so lots of threads call terminate()
at similar times.
This would work instead:
std::thread t1;
std::thread t2;
try {
t1 = std::thread{
[&](){ p_merge( "LESS", depth+1, lvl_id, 0, p1, q1, p2, q2, p3, r3 ); }
};
t2 = std::thread{
[&](){ p_merge( "MORE", depth+1, lvl_id, 1, q1+1, r1, q2, r2, q3+1, r3 ); }
};
}
catch( ... )
{
fprintf( stderr, "OK - I am dying during a std::thread spawn\n" );
exit( 1 );
}
t1.join();
t2.join();
I don't think it's a problem here because the only place in p_merge
that can throw is inside a try
block, but you should also be aware that if an exception leaves the function run by std::thread
that will also call std::terminate()
so it that's not what you want then you should pass noexcept
functions (or just non-throwing functions) to std::thread
.
Upvotes: 2