Reputation: 13
I'm using multiprocessing.pool
to perform multiple integration in parallel.
In this program I integrate an equation of motion for different realization of noise by generating the dW
3D array. The first part of the program is just definition of parameters and the generation of arrays needed for the calculation.
I generate dW
outside the function since I know that otherwise I have to reseed at each process to not obtain the same random sequence.
Euler(replica)
function is the function which have to be parallelize. This include a for
loop over a single process for the numerical integration. The arg replica
is the number of the replica of my system as stored in the "replicas" array, which is the argument passed in pool.map
.
import numpy as np
from multiprocessing import Pool
# parameters
N = 30 # number of sites
T = 1 # total time
dt = 0.1 # time step
l = 0 # initially localized state on site l
e = 0.0 # site energy
v = 1.0 # hopping coefficient
mu, sigma = 0, 1.0 # average and variance of the gaussian distribution
num_replicas = 8 # number of replicas of the system
processes=2 # number of processes
# identity vector which represents the diagonal of the Hamiltonian
E = np.ones(N) * e
# vector which represents the upper/lower diagonal terms of the Hopping Matrix and the Hamiltonian
V = np.ones(N-1) * v
# definition of the tight-binding Hamiltonian (tridiagonal)
H = np.diag(E) + np.diag(V, k=1) + np.diag(V, k=-1)
# corner elements of the Hamiltonian
H[0, -1] = v
H[-1, 0] = v
# time array
time_array = np.arange(0, T, dt)
# site array
site_array = np.arange(N)
# initial state
psi_0 = np.zeros((N), dtype=complex)
psi_0[l] = 1. + 0.j
#initialization of the state array
Psi = np.zeros((len(time_array), N), dtype=complex)
Psi[0,:] = psi_0
# replicas 1D array
replicas = np.arange(0, num_replicas)
# random 2D array
dW = np.random.normal(mu, 1.0, (len(time_array), num_replicas, N)) * np.sqrt(dt)
def Euler(replica):
psi_0 = np.zeros((N), dtype=complex)
psi_0[l] = 1. + 0.j
psi = psi_0
for i in np.arange(1, len(time_array)):
psi += -1.j * (H @ psi) * dt - 1.j * sigma * psi * dW[i,replica,:] - 0.5 * (sigma**2) * psi * dt
psi /= np.sqrt(psi @ np.conj(psi))
Psi[i,:] = psi
return Psi
pool = Pool(processes)
Psi = pool.map(Euler, replicas)
Psi = np.asarray(Psi)
Psi = np.swapaxes(Psi,0,1)
print(Psi)
Empirically I found that if num_replicas > 4 * processes
as expressed in the pool.map
function, it seems that two processes take the same argument, as if the same calculation is repeated two times. Instead, from 'num_replicas <= 4*processes` I get the expected result: each process is different from the others.
This is not due to the generation of the random matrix dW
, since each row is uncorrelated, so I ascribe this behavior to my use of multiprocessing.pool
.
Upvotes: 1
Views: 338
Reputation: 16174
as @Fabrizio pointed out, Psi
is shared between invocations of Euler
. this is generally a bad thing to do and another example of why it's a bad idea to have "global mutable state". it's just too easy for things to break in unexpected ways
the reason it fails in this case is subtle and due to the way Pool.map
accumulates several results in each process before pickling them and returning them to the parent/controlling process. you can see this by setting the chunksize
parameter of map
to 1
, causing it return results immediately and hence not overwriting intermediate results
it's equivalent to the following minimal working example:
from multiprocessing import Pool
arr = [None]
def mutate_global(i):
arr[0] = i
return arr
with Pool(2) as pool:
out = pool.map(mutate_global, range(10), chunksize=5)
print(out)
the last time I ran this I got:
[[4], [4], [4], [4], [4], [9], [9], [9], [9], [9]]
you can change the chunksize
parameter to get an idea of what it's doing, or maybe run with the following version:
def mutate_local(i):
arr = [None]
arr[0] = i
return arr
which "just works", and is the equlivelant to doing what @Fabrizio describes where you create Phi
inside Euler
rather than using a single global variable
Upvotes: 2
Reputation: 939
I think you should initialize your psi_0 and "Psi" inside the Euler function.
I tried to reproduce your results and, indeed, I found that when num_replicas > 4 * processes
you get the same results from multiple processors. But I think this is due to the fact that Psi, in your case, it's a global variable.
Modifying the code as following it gives different results for each num_replicas (by the way, why do you use site_array? It is not used anywhere).
import numpy as np
from multiprocessing import Pool
# parameters
N = 3 # number of sites
T = 1 # total time
dt = 0.1 # time step
l = 0 # initially localized state on site l
e = 0.0 # site energy
v = 1.0 # hopping coefficient
mu, sigma = 0, 1.0 # average and variance of the gaussian distribution
num_replicas = 10 # number of replicas of the system
processes=2 # number of processes
# identity vector which represents the diagonal of the Hamiltonian
E = np.ones(N) * e
# vector which represents the upper/lower diagonal terms of the Hopping Matrix and the Hamiltonian
V = np.ones(N-1) * v
# definition of the tight-binding Hamiltonian (tridiagonal)
H = np.diag(E) + np.diag(V, k=1) + np.diag(V, k=-1)
# corner elements of the Hamiltonian
H[0, -1] = v
H[-1, 0] = v
# time array
time_array = np.arange(0, T, dt)
## site array
#site_array = np.arange(N)
# replicas 1D array
replicas = np.arange(0, num_replicas)
# random 2D array
dW = np.random.normal(mu, 1.0, (len(time_array), num_replicas, N)) * np.sqrt(dt)
#dW = np.random.normal(mu, 1.0, (len(time_array), num_replicas, N)) * np.sqrt(dt)
def Euler(replica):
# initial state
psi_0 = np.zeros((N), dtype=complex)
psi_0[l] = 1. + 0.j
#initialization of the state array
Psi = np.zeros((len(time_array), N), dtype=complex)
Psi[0,:] = psi_0
psi_0 = np.zeros((N), dtype=complex)
psi_0[l] = 1. + 0.j
psi = psi_0
# print(dW[0,replica,0])
for i in np.arange(1, len(time_array)):
psi += -1.j * (H @ psi) * dt - 1.j * sigma * psi * dW[i,replica,:] - 0.5 * (sigma**2) * psi * dt
psi /= np.sqrt(psi @ np.conj(psi))
Psi[i,:] = psi
return Psi
pool = Pool(processes)
Psi = pool.map(Euler, replicas)
Psi = np.asarray(Psi)
Psi = np.swapaxes(Psi,0,1)
print(Psi)
Upvotes: 1