greedybuddha
greedybuddha

Reputation: 7507

Boost MPI Crashing on world.iprobe

I'm doing a project where I am using Boost MPI to create a Primary/Secondary(Master/Slave) system where the Primary distributes work to the Secondaries in a loop. About ~30% of the time the program crashes due to the following.

runner(72828,0x7fff903c6380) malloc: *** error for object 0x7fae75b07af8: incorrect checksum for freed object - object was probably modified after being freed.
*** set a breakpoint in malloc_error_break to debug
[Lappies:72828] *** Process received signal ***
[Lappies:72828] Signal: Segmentation fault: 11 (11)
[Lappies:72828] Signal code: Address not mapped (1)
[Lappies:72828] Failing at address: 0x8
[Lappies:72828] [ 0] 0   libsystem_platform.dylib            0x00007fff580a7f5a _sigtramp + 26
[Lappies:72828] [ 1] 0   libdyld.dylib                       0x00007fff57d99292 dyld_stub_binder + 282
[Lappies:72828] [ 2] 0   libmpi.40.dylib                     0x000000010be67339 ompi_comm_destruct + 32
[Lappies:72828] [ 3] 0   mca_pml_ob1.so                      0x000000010d2c5308 mca_pml_ob1_iprobe + 549
[Lappies:72828] [ 4] 0   libmpi.40.dylib                     0x000000010bea007f MPI_Iprobe + 284
[Lappies:72828] [ 5] 0   libboost_mpi-mt.dylib               0x000000010c005aae _ZNK5boost3mpi12communicator6iprobeEii + 62
[Lappies:72828] [ 6] [Lappies:72828] *** Process received signal ***
[Lappies:72828] Signal: Abort trap: 6 (6)
[Lappies:72828] Signal code:  (0)
[Lappies:72828] [ 0] 0   run_hi                              0x000000010b539155 _ZN9Secondary12recvMessagesEv + 69
0   libsystem_platform.dylib            0x00007fff580a7f5a _sigtramp + 26
[Lappies:72828] [ 1] [Lappies:72828] [ 7] 0   ???                                 0x000000000000ffff 0x0 + 65535
[Lappies:72828] [ 2] 0   run_hi                              0x000000010b526915 _ZN9Secondary3runEv + 53
[Lappies:72828] [ 8] 0   libsystem_c.dylib                   0x00007fff57e451ae abort + 127
[Lappies:72828] [ 3] 0   run_hi                              0x000000010b52ff43 _ZN7Primary9runWorkerEv + 35
[Lappies:72828] 0   libsystem_malloc.dylib              0x00007fff57f4ead4 szone_error + 596
[ 9] [Lappies:72828] [ 4] 0   run_hi                              0x000000010b532bb1 _ZNSt3__114__thread_proxyINS_5tupleIJNS_10unique_ptrINS_15__thread_structENS_14default_deleteIS3_EEEEPFvvEEEEEEPvSA_ + 497
[Lappies:72828] [10] 0   libsystem_malloc.dylib              0x00007fff57f44721 tiny_free_list_remove_ptr + 298
[Lappies:72828] [ 5] 0   libsystem_pthread.dylib             0x00007fff580b1661 _pthread_body + 340
[Lappies:72828] [11] 0   libsystem_pthread.dylib             0x00007fff580b150d _pthread_body + 0
[Lappies:72828] [12] 0   libsystem_pthread.dylib             0x00007fff580b0bf9 thread_start + 13
[Lappies:72828] *** End of error message ***
0   libsystem_malloc.dylib              0x00007fff57f59aca tiny_free_no_lock + 1450
[Lappies:72828] [ 6] 0   libsystem_malloc.dylib              0x00007fff57f5a256 free_tiny + 628
--------------------------------------------------------------------------
mpirun noticed that process rank 0 with PID 0 on node Lappies exited on     signal 11 (Segmentation fault: 11).
--------------------------------------------------------------------------

As you can see it is crashing on the boost mpi iprobe. This sometimes happens in Secondary::recvMessages and sometimes in Secondary::checkQuit with about equal frequency.

Is this problem my code? Or is there a bug within Boost MPI, and if so how can I potentially work around? Regardless, I'm stumped on how to resolve this.

Secondary.hpp

namespace mpi = boost::mpi;

class Secondary {
    mpi::communicator world;

    volatile bool quit;
    std::list<WorkItem*> queue;
    std::list<mpi::request> sends;
    int rank;
    void resolveSends(){
        if (!sends.empty()){
            sends.remove_if([](mpi::request req){ return req.test(); });
        }
    }
public:
    explicit Secondary(int rank) : rank(rank), quit(false) {}

    void dowork(){
        /// take everything from the queue and do the work
        if (!queue.empty()){ /// Do work
            WorkItem* pwi = queue.front();
            queue.pop_front();
            std::list<WorkItem*> l = pwi->work();
            for (auto& i : l){
                queue.push_back(i);
            }
            ReturnResult rr = ReturnResult(rank, queue.size());
            std::cout << "    Secondary " << rank << "  workleft=" << queue.size()
                    << "  isend (" << pwi->getId() << " ) " << rr.workerid << " : " << rr.remaining <<std::endl;

            sends.push_back(world.isend(0, TagType::WORK_STATUS, rr.remaining));
            checkQuit(); /// make sure we respond to quits
            delete pwi;
        }
        resolveSends();
    }

    void recvMessages() {
        while (world.iprobe(0, TagType::WORK)) {
            std::vector<WorkItem*> pwi;
            world.recv(0, TagType::WORK, pwi);
            std::cout << "    Secondary " << world.rank() << "  found (" << pwi.size() << " ) " << std::endl;
            for (auto& i : pwi){
                queue.push_back(i);
            }
        }
    }

    bool checkQuit() {
        if (world.iprobe(0, TagType::QUIT)) {
            std::cout << "    Secondary " << world.rank() << "  found QUIT  " << std::endl;
            world.recv(0, TagType::QUIT);
            quit = true;
            return true;
        }
        return false;
    }

    void run() {
        /// for some reason going through this loop will often cause iprobe crashes
        while (!quit) {
            recvMessages(); /// check for more workitems

            dowork(); /// do the work

            if (checkQuit()){ /// check for quit
                resolveSends();
                break;}
            /// Yield is insufficient
            std::this_thread::yield();
        }
    }
};

Primary.hpp

namespace mpi = boost::mpi;

class Primary {
    mpi::communicator world;

    JobHandler jh;
    std::list<mpi::request> sends;
    void resolveSends(){
        if (!sends.empty()) {
            sends.remove_if([](mpi::request req) { return req.test(); });
        }
    }
public:
    Primary() : jh(world.size()){}

    static void runWorker(){
        Secondary worker(0);
        worker.run();
    }

    void runJobs(){
        std::vector<int> workerIds(world.size());
        std::iota(workerIds.begin(), workerIds.end(), 0);
        const int nworkers = static_cast<int>(workerIds.size());

        std::thread workerThread = std::thread(runWorker);
        std::list<WorkItem*> allitems = jh.getAllItems();
        int sendto;
        while (true) {
            /// Send all of our items
            if (!allitems.empty()){
                std::vector<WorkItem*> v{ std::begin(allitems), std::end(allitems) };
                for (int i=0, sendto=1; i< nworkers;++i, ++sendto){
                    std::vector<WorkItem *> send = vecutil::split(v, nworkers, i);
                    std::cout << " >>> " << workerIds[sendto % nworkers] << " " << send.size() << " " << allitems.size() << std::endl;
                    sends.push_back(world.isend(workerIds[sendto % nworkers], TagType::WORK, send));
                    jh.sendingWorkTo(sendto, send.size());
                }
            }
            resolveSends();

            std::this_thread::yield();
            std::for_each(allitems.begin(), allitems.end(), vecutil::DeleteVector<WorkItem*>());
            allitems.clear();

            /// Check for done items
            for (auto& i : workerIds) {
                while (world.iprobe(i, TagType::WORK_STATUS)) {
                    int r;
                    mpi::request req = world.irecv(i, TagType::WORK_STATUS, r);
                    jh.workItemComplete(ReturnResult(i,r));
                    std::cout << jh << "                 <<< Secondary " <<i << "  remaining=" << r <<
                            "  complete=" << jh.isComplete() << std::endl;
                }
            }
            /// Check for complete
            if (jh.isComplete()){
                std::cout << "====\n >>> complete " << std::endl;
                break;
            }
            std::this_thread::yield();
        }

        std::cout << "======================  sending quit " << std::endl;
        mpi::request reqs[nworkers];
        int n=0;
        for (auto& i : workerIds) {
            reqs[n++] = world.isend(i, TagType::QUIT);
        }
        mpi::wait_all(reqs, reqs + nworkers);
        std::cout << "======================  gathering " << std::endl;
        workerThread.join();
        std::cout << "======================  quitting " << std::endl;
    }



    void addJob(Job* pjob) {
        jh.addJob(pjob);
    }
};

I can provide whatever additional code that is needed but I think this is the relevant section.

Upvotes: 0

Views: 158

Answers (1)

Daniel Langr
Daniel Langr

Reputation: 23527

It seems that you use MPI library (wrapper by Boost.MPI) from multiple threads. In that case, you need to init MPI library accordingly. However, you need to be sure that the underlying MPI library supports threading and at which level. There are 3 levels called funneled, serialized and multiple. Then, you can use, e.g.:

mpi::environment env(argc, argv, mpi::threading::multiple);

However, I can't find in the documentation what happens if an MPI library doesn't support MPI_THREAD_MULTIPLE in this case.

Upvotes: 1

Related Questions