Reputation: 5349
I am writing a MPI programming using python (mpi4py). Many processes compute partial results, and send both the index and the update to the master task. My code that gathers all the data is given as
if rank == 0:
cb = dict((v,0) for v in graph)
#print "initial is",cb
while True:
neww = comm.recv(source=ANY_SOURCE, tag=1)
newdeltaw = comm.recv(source=ANY_SOURCE, tag=2)
print "newdelw is",newdeltaw,"neww is",neww
cb[neww]=cb[neww]+newdeltaw
print "cb=",cb
But there is a race condition here which affects my results for large numbers of processors - I may have a situation in which cb[neww]=cb[neww]+newdeltaw
in which data for news
and newdeltaw
comes from different process. How do I prevent this?
Upvotes: 2
Views: 437
Reputation: 50937
While MPI has an in-order guarantee in the sense that two messages from rank 1 to rank 0 will be received by rank 0 in the order they will sent - one message cannot overtake the other - MPI says nothing, and can say nothing, about how they will be interleaved with other messages from other processors. So you can easily get situations like:
rank 1 messages to rank 0: [src 1, msg A, tag 1], [src 1, msg B, tag 2]
rank 2 messages to rank 0: [src 2, msg C, tag 1], [src 2, msg D, tag 2]
rank 0 message queue: [src 1, msg A, tag 1], [src 2, msg C, tag 1], [src 2, msg D, tag 2], [src 1, msg B, tag 2]
So that rank 0 extracting a message with tag 1 will get rank 1's msg A, but then with tag 2 will get rank 2's msg D. (Note that the message queue above satisfies the in-order guarantee above but doesn't help us here).
There's a few ways around this. One is to filter the messages received for newdeltaw
not just by tag but by source, to make sure it is from the same task that sent the neww
:
if rank == 0:
cb = numpy.zeros(size)
rstat = MPI.Status()
for i in range((size-1)*3):
neww = comm.recv(source=MPI.ANY_SOURCE, tag=1, status=rstat)
src = rstat.Get_source()
newdeltaw = comm.recv(source=src, tag=2)
print "newdelw is",newdeltaw,"neww is",neww
cb[neww]=cb[neww]+newdeltaw
print "cb=",cb
else:
data = rank
for i in range(3):
comm.send(rank,dest=0,tag=1)
comm.send(data,dest=0,tag=2)
This way, only the tag-2 newdeltaw message from the matching source is received, avoiding the inconsistency.
Another approach is to avoid splitting the messages at all, by putting both pieces of data into the same message:
if rank == 0:
cb = numpy.zeros(size)
rstat = MPI.Status()
for i in range((size-1)*3):
(neww,newdeltaw) = comm.recv(source=MPI.ANY_SOURCE, tag=1)
print "newdelw is",newdeltaw,"neww is",neww
cb[neww]=cb[neww]+newdeltaw
print "cb=",cb
else:
data = rank
for i in range(3):
comm.send((rank,data),dest=0,tag=1)
This bundles both pieces of data into one message, so they can't be separated. (Note that once this is working, you can use more efficient lower-level mpi4py routines to avoid serializing the tuples:
if rank == 0:
cb = numpy.zeros(size)
rstat = MPI.Status()
for i in range((size-1)*3):
dataarr = numpy.zeros(2,dtype='i')
comm.Recv([dataarr,MPI.INT],source=MPI.ANY_SOURCE, tag=1)
newdeltaw = dataarr[0]
neww = dataarr[1]
print "newdelw is",newdeltaw,"neww is",neww
cb[neww]=cb[neww]+newdeltaw
print "cb=",cb
else:
data = rank
for i in range(3):
senddata = numpy.array([rank,data],dtype='i')
comm.Send([senddata, MPI.INT],dest=0,tag=1)
Finally, you can avoid the master/slave approach entirely and have all processors working on their partial results in the problem, and then combine all the results at the end with a reduce operation:
cb = numpy.zeros(size,dtype='i')
totals = numpy.zeros(size,dtype='i')
data = rank
for i in range(3):
cb[rank] = cb[rank] + data
comm.Reduce([cb,MPI.INT], [totals,MPI.INT], op=MPI.SUM, root=0)
if rank == 0:
print "result is", totals
Upvotes: 2