Reputation: 7507
I'm doing a project where I am using Boost MPI to create a Primary/Secondary(Master/Slave) system where the Primary distributes work to the Secondaries in a loop. About ~30% of the time the program crashes due to the following.
runner(72828,0x7fff903c6380) malloc: *** error for object 0x7fae75b07af8: incorrect checksum for freed object - object was probably modified after being freed.
*** set a breakpoint in malloc_error_break to debug
[Lappies:72828] *** Process received signal ***
[Lappies:72828] Signal: Segmentation fault: 11 (11)
[Lappies:72828] Signal code: Address not mapped (1)
[Lappies:72828] Failing at address: 0x8
[Lappies:72828] [ 0] 0 libsystem_platform.dylib 0x00007fff580a7f5a _sigtramp + 26
[Lappies:72828] [ 1] 0 libdyld.dylib 0x00007fff57d99292 dyld_stub_binder + 282
[Lappies:72828] [ 2] 0 libmpi.40.dylib 0x000000010be67339 ompi_comm_destruct + 32
[Lappies:72828] [ 3] 0 mca_pml_ob1.so 0x000000010d2c5308 mca_pml_ob1_iprobe + 549
[Lappies:72828] [ 4] 0 libmpi.40.dylib 0x000000010bea007f MPI_Iprobe + 284
[Lappies:72828] [ 5] 0 libboost_mpi-mt.dylib 0x000000010c005aae _ZNK5boost3mpi12communicator6iprobeEii + 62
[Lappies:72828] [ 6] [Lappies:72828] *** Process received signal ***
[Lappies:72828] Signal: Abort trap: 6 (6)
[Lappies:72828] Signal code: (0)
[Lappies:72828] [ 0] 0 run_hi 0x000000010b539155 _ZN9Secondary12recvMessagesEv + 69
0 libsystem_platform.dylib 0x00007fff580a7f5a _sigtramp + 26
[Lappies:72828] [ 1] [Lappies:72828] [ 7] 0 ??? 0x000000000000ffff 0x0 + 65535
[Lappies:72828] [ 2] 0 run_hi 0x000000010b526915 _ZN9Secondary3runEv + 53
[Lappies:72828] [ 8] 0 libsystem_c.dylib 0x00007fff57e451ae abort + 127
[Lappies:72828] [ 3] 0 run_hi 0x000000010b52ff43 _ZN7Primary9runWorkerEv + 35
[Lappies:72828] 0 libsystem_malloc.dylib 0x00007fff57f4ead4 szone_error + 596
[ 9] [Lappies:72828] [ 4] 0 run_hi 0x000000010b532bb1 _ZNSt3__114__thread_proxyINS_5tupleIJNS_10unique_ptrINS_15__thread_structENS_14default_deleteIS3_EEEEPFvvEEEEEEPvSA_ + 497
[Lappies:72828] [10] 0 libsystem_malloc.dylib 0x00007fff57f44721 tiny_free_list_remove_ptr + 298
[Lappies:72828] [ 5] 0 libsystem_pthread.dylib 0x00007fff580b1661 _pthread_body + 340
[Lappies:72828] [11] 0 libsystem_pthread.dylib 0x00007fff580b150d _pthread_body + 0
[Lappies:72828] [12] 0 libsystem_pthread.dylib 0x00007fff580b0bf9 thread_start + 13
[Lappies:72828] *** End of error message ***
0 libsystem_malloc.dylib 0x00007fff57f59aca tiny_free_no_lock + 1450
[Lappies:72828] [ 6] 0 libsystem_malloc.dylib 0x00007fff57f5a256 free_tiny + 628
--------------------------------------------------------------------------
mpirun noticed that process rank 0 with PID 0 on node Lappies exited on signal 11 (Segmentation fault: 11).
--------------------------------------------------------------------------
As you can see it is crashing on the boost mpi iprobe. This sometimes happens in Secondary::recvMessages
and sometimes in Secondary::checkQuit
with about equal frequency.
Is this problem my code? Or is there a bug within Boost MPI, and if so how can I potentially work around? Regardless, I'm stumped on how to resolve this.
Secondary.hpp
namespace mpi = boost::mpi;
class Secondary {
mpi::communicator world;
volatile bool quit;
std::list<WorkItem*> queue;
std::list<mpi::request> sends;
int rank;
void resolveSends(){
if (!sends.empty()){
sends.remove_if([](mpi::request req){ return req.test(); });
}
}
public:
explicit Secondary(int rank) : rank(rank), quit(false) {}
void dowork(){
/// take everything from the queue and do the work
if (!queue.empty()){ /// Do work
WorkItem* pwi = queue.front();
queue.pop_front();
std::list<WorkItem*> l = pwi->work();
for (auto& i : l){
queue.push_back(i);
}
ReturnResult rr = ReturnResult(rank, queue.size());
std::cout << " Secondary " << rank << " workleft=" << queue.size()
<< " isend (" << pwi->getId() << " ) " << rr.workerid << " : " << rr.remaining <<std::endl;
sends.push_back(world.isend(0, TagType::WORK_STATUS, rr.remaining));
checkQuit(); /// make sure we respond to quits
delete pwi;
}
resolveSends();
}
void recvMessages() {
while (world.iprobe(0, TagType::WORK)) {
std::vector<WorkItem*> pwi;
world.recv(0, TagType::WORK, pwi);
std::cout << " Secondary " << world.rank() << " found (" << pwi.size() << " ) " << std::endl;
for (auto& i : pwi){
queue.push_back(i);
}
}
}
bool checkQuit() {
if (world.iprobe(0, TagType::QUIT)) {
std::cout << " Secondary " << world.rank() << " found QUIT " << std::endl;
world.recv(0, TagType::QUIT);
quit = true;
return true;
}
return false;
}
void run() {
/// for some reason going through this loop will often cause iprobe crashes
while (!quit) {
recvMessages(); /// check for more workitems
dowork(); /// do the work
if (checkQuit()){ /// check for quit
resolveSends();
break;}
/// Yield is insufficient
std::this_thread::yield();
}
}
};
Primary.hpp
namespace mpi = boost::mpi;
class Primary {
mpi::communicator world;
JobHandler jh;
std::list<mpi::request> sends;
void resolveSends(){
if (!sends.empty()) {
sends.remove_if([](mpi::request req) { return req.test(); });
}
}
public:
Primary() : jh(world.size()){}
static void runWorker(){
Secondary worker(0);
worker.run();
}
void runJobs(){
std::vector<int> workerIds(world.size());
std::iota(workerIds.begin(), workerIds.end(), 0);
const int nworkers = static_cast<int>(workerIds.size());
std::thread workerThread = std::thread(runWorker);
std::list<WorkItem*> allitems = jh.getAllItems();
int sendto;
while (true) {
/// Send all of our items
if (!allitems.empty()){
std::vector<WorkItem*> v{ std::begin(allitems), std::end(allitems) };
for (int i=0, sendto=1; i< nworkers;++i, ++sendto){
std::vector<WorkItem *> send = vecutil::split(v, nworkers, i);
std::cout << " >>> " << workerIds[sendto % nworkers] << " " << send.size() << " " << allitems.size() << std::endl;
sends.push_back(world.isend(workerIds[sendto % nworkers], TagType::WORK, send));
jh.sendingWorkTo(sendto, send.size());
}
}
resolveSends();
std::this_thread::yield();
std::for_each(allitems.begin(), allitems.end(), vecutil::DeleteVector<WorkItem*>());
allitems.clear();
/// Check for done items
for (auto& i : workerIds) {
while (world.iprobe(i, TagType::WORK_STATUS)) {
int r;
mpi::request req = world.irecv(i, TagType::WORK_STATUS, r);
jh.workItemComplete(ReturnResult(i,r));
std::cout << jh << " <<< Secondary " <<i << " remaining=" << r <<
" complete=" << jh.isComplete() << std::endl;
}
}
/// Check for complete
if (jh.isComplete()){
std::cout << "====\n >>> complete " << std::endl;
break;
}
std::this_thread::yield();
}
std::cout << "====================== sending quit " << std::endl;
mpi::request reqs[nworkers];
int n=0;
for (auto& i : workerIds) {
reqs[n++] = world.isend(i, TagType::QUIT);
}
mpi::wait_all(reqs, reqs + nworkers);
std::cout << "====================== gathering " << std::endl;
workerThread.join();
std::cout << "====================== quitting " << std::endl;
}
void addJob(Job* pjob) {
jh.addJob(pjob);
}
};
I can provide whatever additional code that is needed but I think this is the relevant section.
Upvotes: 0
Views: 158
Reputation: 23527
It seems that you use MPI library (wrapper by Boost.MPI) from multiple threads. In that case, you need to init MPI library accordingly. However, you need to be sure that the underlying MPI library supports threading and at which level. There are 3 levels called funneled, serialized and multiple. Then, you can use, e.g.:
mpi::environment env(argc, argv, mpi::threading::multiple);
However, I can't find in the documentation what happens if an MPI library doesn't support MPI_THREAD_MULTIPLE
in this case.
Upvotes: 1