Reputation: 2385
In my scenario, I have an environment that I'm trying to sample from. Each process will sample multiple times from this environment.
import numpy as np
class EnvSim(object):
@staticmethod
def get():
return np.random.randint(0, 2000)
from collections import defaultdict
class Dict(object):
def __init__(self):
self.d = defaultdict(int)
def update(self, key):
self.d[key] += 1
print(key)
data_array = [np.empty(1, dtype=np.int) for _ in range(num_cpu)]
data_array[proc_id()] = np.array([key], dtype=np.int)
MPI.COMM_WORLD.Bcast(data_array[proc_id()], root=proc_id())
for data in data_array:
self.d[data.tolist()[0]] += 1
The goal is for each OpenMPI process to share what they've sampled from the environment synchronously or asynchronously. Is Bcast
the right method to use here or should I use something else?
This is the main statement that I used to execute my program :( currently this is not working.
def mpi_fork(n, bind_to_core=False):
"""
Re-launches the current script with workers linked by MPI.
Args:
n (int): Number of process to split into.
bind_to_core (bool): Bind each MPI process to a core.
"""
if n<=1:
return
if os.getenv("IN_MPI") is None:
env = os.environ.copy()
env.update(
MKL_NUM_THREADS="1",
OMP_NUM_THREADS="1",
IN_MPI="1"
)
args = ["mpirun", "-np", str(n)]
if bind_to_core:
args += ["-bind-to", "core"]
args += [sys.executable] + sys.argv
subprocess.check_call(args, env=env)
sys.exit()
if __name__ == '__main__':
num_cpu = 3
mpi_fork(num_cpu)
dic = Dict()
for _ in range(3):
exp = EnvSim.get()
dic.update(exp)
print(dic.d)
Upvotes: 1
Views: 393
Reputation: 978
I'm not sure what you mean by "synchronously and asynchronously", so I'll just focus on the synchronous case here.
If you want all ranks to sample and send to everyone, then I think you want alltoall
instead of Bcast
.
Below is an example script where each rank
samples N
values from the interval (rank,rank+1)
, where N
is the size of the communicator.
from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
senddata = np.random.uniform(rank,rank+1,size)
recvdata = np.empty(size, dtype=float)
comm.Alltoall(senddata, recvdata)
print("process %s sending %s receiving %s " % (rank,senddata,recvdata))
Instead of having the script launch itself, can you just call a directly form the command line with:
$ mpirun -np 3 python test.py
and you should see output such as
Rank 0 sent [0.37362478 0.74304362 0.25090876] and received [0.37362478 1.81852273 2.48959575]
Rank 1 sent [1.81852273 1.65782547 1.85142608] and received [0.74304362 1.65782547 2.23064501]
Rank 2 sent [2.48959575 2.23064501 2.644848 ] and received [0.25090876 1.85142608 2.644848 ]
This can be included in a for
loop, if multiple rounds of sampling/communication are desired.
If there is some expectation of variability in the times to sample, then you could have rank zero be a master and perform non-blocking queries of each of the remaining ranks. For example:
from mpi4py import MPI
import numpy as np
from time import sleep
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
tag_denoting_ready_to_send = 1
while 1:
if comm.rank == 0:
if comm.Iprobe(source=MPI.ANY_SOURCE, tag=tag_denoting_ready_to_send):
buffer_for_receiving = np.empty(1, dtype='i')
comm.Recv([buffer_for_receiving, MPI.INT], source=MPI.ANY_SOURCE, tag=tag_denoting_ready_to_send)
print(buffer_for_receiving[0])
else:
sleep(comm.rank*np.random.uniform())
send_buffer = np.array(rank, dtype='i')
comm.Send([send_buffer, MPI.INT], dest=0, tag=tag_denoting_ready_to_send)
Each non-zero rank is sleeping and trying to Send
their rank in a buffer to rank 0 (which prints that out). Again, running with
$ mpirun -np 20 python test2.py
should yield output such as:
13
6
1
1
2
7
1
2
1
4
1
8
3
Upvotes: 1