Reputation: 469
How can we update one global dictionary in MPI (specifically mpi4py) across different processors. The issue that I am encountering now after broadcasting is that different processors cannot see the changes (update) on the dictionary by the other processors.
for example the input data is as follows:
col1 col2
-----------
a 1
a 1
b 2
c 3
c 1
the output dictionary should be as follows:
{'a': 2, 'b': 2, 'c': 4}
which means the col2 in the input are added together and created the value for the keys (col1). The dictionary initially is empty and is getting updated during the parallel processing by all the processors (at least this is what we’re trying to do).
Upvotes: 1
Views: 682
Reputation: 51393
How can we update one global dictionary in MPI (specifically mpi4py) across different processors. The issue that i am encountering now after broadcasting is that different processors cannot see the changes (update) on the dictionary by the other processors.
First, you need to understand that in MPI, each MPI process runs a complete copy of the program. Consequently, all the data allocated on that program is private to each process.
Let us look at the following example:
from mpi4py import MPI
import numpy
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
if rank == 0:
dictionary = {'a': 1, 'c': 3}
for i in range(1, size, 1):
data = comm.recv(source=i, tag=11)
for key in data:
if key in dictionary:
dictionary[key] = dictionary[key] + data[key]
else:
dictionary[key] = data[key]
print(dictionary)
else:
data = {'a': 1, 'b': 2, 'c': 1}
comm.send(data, dest=0, tag=11)
In this code, the process with rank=0
allocates a dictionary
, which is private to that process, in the same way, that data = {'a': 1, 'b': 2, 'c': 1}
is private to each of the other processes. If (for instance) a process changes the variable size
, that change will not be visible to the other processes.
In this code, all processes send their dictionary copy:
data = {'a': 1, 'b': 2, 'c': 1}
comm.send(data, dest=0, tag=11)
to the process 0, which calls comm.recv
for each of the other processes:
for i in range(1, size, 1):
data = comm.recv(source=i, tag=11)
and merges the data received (from the other processes) into its own dictionary:
for key in data:
if key in dictionary:
dictionary[key] = dictionary[key] + data[key]
else:
dictionary[key] = data[key]
in the end, only the process 0 has the complete dictionary
. The same has happened to you when you did the broadcasting. Nevertheless, MPI does have routines (i.e., comm.Allgather
) that would allow you to have the entire dictionary
in all the processes.
An example of such code (you just need to adapt to a dictionary):
from mpi4py import MPI
import numpy
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
sendBuffer = numpy.ones(1, dtype=bool)
recvBuffer = numpy.zeros(size, dtype=bool)
print("Before Allgather => Process %s | sendBuffer %s | recvBuffer %s" % (rank, sendBuffer, recvBuffer))
comm.Allgather([sendBuffer, MPI.BOOL],[recvBuffer, MPI.BOOL])
print("After Allgather => Process %s | sendBuffer %s | recvBuffer %s" % (rank, sendBuffer, recvBuffer))
MacBook-Pro-de-Bruno:Python dreamcrash$
The dictionary initially is empty and is getting updated during the parallel processing by all the processors (at least this is what we’re trying to do).
With the aforementioned model (i.e., distributed-memory paradigm) you would need to explicitly communicate with all the processes every time one of them would change the dictionary. This means that you would have to know beforehand the points in the code where you should make those communications.
However, based on your text, you seem to want a shared-memory approach, where a process would update the dictionary, for instance as follows :
if key in dictionary:
dictionary[key] = dictionary[key] + data[key]
else:
dictionary[key] = data[key]
and those changes would be immediately visible to all processes. Just like what happens in a multithreading code.
MPI 3.0 introduces the concept of shared-memory where one can actually achieve that.
Here is an example using arrays:
from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
size = 1000
itemsize = MPI.DOUBLE.Get_size()
if comm.Get_rank() == 0:
nbytes = size * itemsize
else:
nbytes = 0
win = MPI.Win.Allocate_shared(nbytes, itemsize, comm=comm)
buf, itemsize = win.Shared_query(0)
assert itemsize == MPI.DOUBLE.Get_size()
buf = np.array(buf, dtype='B', copy=False)
ary = np.ndarray(buffer=buf, dtype='d', shape=(size,))
if comm.rank == 1:
ary[:5] = np.arange(5)
comm.Barrier()
if comm.rank == 0:
print(ary[:10])
The code is not mine, it comes from here.
Upvotes: 3