kfmfe04
kfmfe04

Reputation: 15337

How do I verify if runtime failure is due to too many threads spawned?

I am using std::thread and gcc as my compiler in implementing the parallel-merge as described in Cormen's Introduction to Algorithms.

I think I got the code to work. It passes all randomly seeded arrays that are not too big. However, when I try to merge two arrays that are large (1e6 elements each), I get the following termination:

terminate called without an active exception
terminate called recursively
terminate called recursively

Using gdb doesn't help: it becomes corrupted during the run.

I am pretty certain that the run has failed due to too many threads spawned.

What can I do to confirm that this error is due to too many std::threads spawned?

NOTES

  1. Code works up to n=1e4, fails by n=1e5
  2. define DEBUG if you want to see output, but I don't recommend this except for small n like 10 or 50.

  3. STRBUF_SIZE/use of fprintf is ugly, but iostream doesn't flush well in threads - this is hacky, but works (no need to focus here).
  4. I tried following Barnes53's suggestion by using a try/catch block around the threads, but this didn't work, apparently.
  5. I know that spawning a gazillion threads is a bad thing - at this point, I am just trying to implement what's in the book and to see if it works, and perhaps discover what its limitations are.

UPDATE

  1. GManNickG's answer below helped: not every run, but during some runs of 1e5, I can see that, indeed, resources are gone.
  2. I will probably look into some kind of k-way parallel sort, where I can control the number of threads spawned, if this algorithm is not salvageable.

CODE

#include <vector>
#include <iostream>
#include <algorithm>
#include <vector>
#include <thread>
#include <cmath>
#include <cstring>
#include <cassert>

#define STRBUF_SIZE 1024

class Random
{
public:
    Random( unsigned int seed=::time(nullptr))
        : m_seed( seed )
    { }
    // between [ 0 .. n-1 ]
    unsigned int rand_uint( unsigned int n )
    {
        return static_cast<unsigned int>
                     (static_cast<float>(n) * rand_r( &m_seed ) / RAND_MAX);
    }
    unsigned int getSeed() const { return m_seed; }
private:
    unsigned int m_seed;
};

template<typename T>
char* dump( char* line, T it1, T it2 )
{
    char buf[80];
    line[0] = '\0';
    for( T it=it1; it!=it2; ++it )
    {
        sprintf( buf, "%u ", *it );
        strcat(  line, buf );
    }
    return line;
}

template< typename T, class It >
It binary_search_it( It beg, It end, const T& value )
{
    auto low  = beg;
    auto high = std::max( beg, end );   // end+1
    while( low < high )
    {
        auto mid = low + std::distance( low, high ) / 2;
        if ( value <= *mid )
            high = mid;
        else
            low = mid + 1;
    }
    return high;
}

template< class InputIt, class OutputIt >
void p_merge( 
    char const*  msg, 
    unsigned     depth,
    unsigned     parent_lvl_id,
    unsigned     lr,
    InputIt  p1, InputIt  r1, 
    InputIt  p2, InputIt  r2, 
    OutputIt p3, OutputIt r3
    )
{
#ifdef DEBUG
    char buff[STRBUF_SIZE];
#endif
    unsigned sum_prev  = pow( 2, depth ) - 1;
    unsigned lvl_id    = 2*parent_lvl_id + lr;
    unsigned thread_no = sum_prev + lvl_id + 1;

    unsigned limit0    = sum_prev + 1;
    unsigned limit1    = pow( 2, depth+1 ) - 1;

#ifdef DEBUG
    char msg_dep[256];
    sprintf( msg_dep, "%s [%2d] %-10d [%d,%d]", msg, depth, thread_no, limit0, limit1 );
    fprintf( stderr, "%s\n", msg_dep );
#endif

    if ( thread_no<limit0 || thread_no>limit1 )
    {
        fprintf( stderr, "OUT OF BOUNDS\n" );
        exit( 1 );
    }

    auto n1 = std::distance( p1, r1 );
    auto n2 = std::distance( p2, r2 );
#ifdef DEBUG
    fprintf( stderr, "%s dist[v1]=%2ld   : %s\n", msg_dep, n1, dump( buff, p1, r1 ) );
    fprintf( stderr, "%s dist[v2]=%2ld   : %s\n", msg_dep, n2, dump( buff, p2, r2 ) );
#endif
    if ( n1<n2 )
    {
        std::swap( p1, p2 );
        std::swap( r1, r2 );
        std::swap( n1, n2 );
#ifdef DEBUG
      fprintf( stderr, "%s swapped[v1]   : %s\n", msg_dep, dump( buff, p1, r1 ));
      fprintf( stderr, "%s swapped[v2]   : %s\n", msg_dep, dump( buff, p2, r2 ));
#endif
    }
    if ( n1==0 )
    {
#ifdef DEBUG
      fprintf( stderr, "%s done              \n", msg_dep );
#endif
        return;
    }
    auto q1 = p1 + n1 / 2;   // midpoint
    auto q2 = binary_search_it( p2, r2, *q1 );  // <q1   q2[q1]   >=q1
    auto q3 = p3 + std::distance( p1, q1 ) + std::distance( p2, q2 );
    *q3 = *q1;

#ifdef DEBUG
    fprintf( stderr, "%s q1[median]=%u  : %s\n", msg_dep, *q1, dump( buff, p1, r1 ));
    fprintf( stderr, "%s q2[fulcrum]=%u : %s\n", msg_dep, *q2, dump( buff, p2, r2 ));
    fprintf( stderr, "%s q3(copied)=%u  : %s\n", msg_dep, *q3, dump( buff, p3, r3 ));
#endif

#ifdef DEBUG
    auto d1 = std::distance( p1,   q1-1 );
    auto d2 = std::distance( q1+1, r1   );
    fprintf( stderr, "%s q1[dist_L]=%ld  : %s\n", msg_dep, d1, dump( buff, p1, r1 ));
    fprintf( stderr, "%s q1[dist_M]=%ld  : %s\n", msg_dep, d2, dump( buff, p1, r1 ));
#endif


    try {
        std::thread t1{ 
            [&](){ p_merge( "LESS", depth+1, lvl_id, 0, p1, q1,   p2, q2,   p3, r3 ); } 
        };
        std::thread t2{ 
            [&](){ p_merge( "MORE", depth+1, lvl_id, 1, q1+1, r1, q2, r2, q3+1, r3 ); } 
        };
        t1.join();
        t2.join();
    }
    catch( ... )
    {
        fprintf( stderr, "OK - I am dying during a std::thread spawn\n" );
        exit( 1 );
    }

#ifdef DEBUG
    fprintf( stderr, "%s synchronized\n", msg_dep );
#endif
}

int
main( int argv, char* argc[] )
{
    // ok up to 1e4, fails by 1e5
    unsigned n = 1e5; 
    Random   r;
    std::vector<unsigned> v1( n ), v2( n ), v3( 2 * n );

#ifdef DEBUG
    fprintf( stderr, "SEED = %u\n", r.getSeed() );
#endif

    std::generate( v1.begin(), v1.end(), [&]() { return r.rand_uint(n); } );
    std::generate( v2.begin(), v2.end(), [&]() { return r.rand_uint(n); } );

#ifdef DEBUG
    char buff[STRBUF_SIZE];
    fprintf( stderr, "%s\n", dump( buff, v1.begin(), v1.end() ));
    fprintf( stderr, "%s\n", dump( buff, v2.begin(), v2.end() ));
#endif

    std::sort( v1.begin(), v1.end() );
    std::sort( v2.begin(), v2.end() );

    p_merge( "TOP ", 0, 0, 0,
        v1.begin(), v1.end(), v2.begin(), v2.end(), v3.begin(), v3.end() );

    assert( std::is_sorted( v3.begin(), v3.end() ));

#ifdef DEBUG
    fprintf( stderr, "FINAL : %s\n", dump( buff, v3.begin(), v3.end() ));
#endif
}

Upvotes: 4

Views: 842

Answers (2)

GManNickG
GManNickG

Reputation: 504333

You can catch std::system_error and check if the code is resource_unavailable_try_again:

#include <atomic>
#include <iostream>
#include <system_error>
#include <thread>
#include <vector>

class thread_collection
{
public:
    thread_collection() :
    mStop(false)
    {}

    ~thread_collection()
    {
        clear();
    }

    template <typename Func, typename... Args>
    bool add(Func&& func, Args&&... args)
    {
        try
        {
            mThreads.emplace_back(std::forward<Func>(func),
                                  std::cref(mStop),
                                  std::forward<Args>(args)...);
        }
        catch (const std::system_error& e)
        {
            if (e.code().value() == std::errc::resource_unavailable_try_again)
                return false; // not possible to make more threads right now
            else
                throw; // something else
        }

        return true; // can keep going
    }

    void clear()
    {
        mStop = true;
        for (auto& thread : mThreads)
        {
            if (thread.joinable())
                thread.join();
        }

        mThreads.clear();
        mStop = true;
    }

    std::size_t size() const
    {
        return mThreads.size();
    }

private:
    thread_collection(const thread_collection&);
    thread_collection& operator=(const thread_collection&);

    std::atomic<bool> mStop;
    std::vector<std::thread> mThreads;
};

void worker(const std::atomic<bool>& stop)
{
    while (!stop)
        std::this_thread::yield();
}

int main()
{
    thread_collection threads;

    try
    {
        while (threads.add(worker))
            continue;

        std::cout << "Exhausted thread resources!" << std::endl;
    }
    catch (const std::exception& e)
    {
        std::cout << "Stopped for some other reason: " << e.what() << std::endl;
    }

    std::cout << "Made: " << threads.size() << " threads." << std::endl;
    threads.clear();
}

(Run this at your own risk!)

According §30.3.1.2/4, this is the error code used to indicate thread creation failure:

Error conditions:
resource_unavailable_try_again — the system lacked the necessary resources to create another thread, or the system-imposed limit on the number of threads in a process would be exceeded.

Note this could be thrown by your own arguments being copied to the resulting thread. To guarantee against this, you need to pre-construct your arguments, then no-throw move them to your thread function.

That said, you're much better off putting a limit on thread creation anyway. There's no point in having more threads running than cores can execute. Use std::thread::hardware_concurrency to get that number.

Upvotes: 5

Jonathan Wakely
Jonathan Wakely

Reputation: 171501

try {
    std::thread t1{ 
        [&](){ p_merge( "LESS", depth+1, lvl_id, 0, p1, q1,   p2, q2,   p3, r3 ); } 
    };
    std::thread t2{ 
        [&](){ p_merge( "MORE", depth+1, lvl_id, 1, q1+1, r1, q2, r2, q3+1, r3 ); } 
    };
    t1.join();
    t2.join();
}
catch( ... )
{
    fprintf( stderr, "OK - I am dying during a std::thread spawn\n" );
    exit( 1 );
}

This code may not do what you expect. If constructing t2 throws an exception then t1 will be destroyed but because that thread is joinable it will call std::terminate() so your catch will not handle the exception.

One reason you're seeing terminate called recursively could be that lots of threads have that same problem at once, so lots of threads call terminate() at similar times.

This would work instead:

std::thread t1;
std::thread t2;

try {
    t1 = std::thread{ 
        [&](){ p_merge( "LESS", depth+1, lvl_id, 0, p1, q1,   p2, q2,   p3, r3 ); } 
    };
    t2 = std::thread{ 
        [&](){ p_merge( "MORE", depth+1, lvl_id, 1, q1+1, r1, q2, r2, q3+1, r3 ); } 
    };
}
catch( ... )
{
    fprintf( stderr, "OK - I am dying during a std::thread spawn\n" );
    exit( 1 );
}

t1.join();
t2.join();

I don't think it's a problem here because the only place in p_merge that can throw is inside a try block, but you should also be aware that if an exception leaves the function run by std::thread that will also call std::terminate() so it that's not what you want then you should pass noexcept functions (or just non-throwing functions) to std::thread.

Upvotes: 2

Related Questions