user11793364
user11793364

Reputation: 13

It seems that multiprocessing.pool take the same argument two times

I'm using multiprocessing.pool to perform multiple integration in parallel.

In this program I integrate an equation of motion for different realization of noise by generating the dW 3D array. The first part of the program is just definition of parameters and the generation of arrays needed for the calculation.

I generate dW outside the function since I know that otherwise I have to reseed at each process to not obtain the same random sequence.

Euler(replica) function is the function which have to be parallelize. This include a for loop over a single process for the numerical integration. The arg replica is the number of the replica of my system as stored in the "replicas" array, which is the argument passed in pool.map.

import numpy as np 
from multiprocessing import Pool

# parameters
N = 30               # number of sites
T = 1                # total time
dt = 0.1             # time step
l = 0                # initially localized state on site l
e = 0.0              # site energy
v = 1.0              # hopping coefficient
mu, sigma = 0, 1.0   # average and variance of the gaussian distribution
num_replicas = 8     # number of replicas of the system
processes=2          # number of processes

# identity vector which represents the diagonal of the Hamiltonian
E = np.ones(N) * e

# vector which represents the upper/lower diagonal terms of the Hopping Matrix and the Hamiltonian
V = np.ones(N-1) * v

# definition of the tight-binding Hamiltonian (tridiagonal)
H = np.diag(E) + np.diag(V, k=1) + np.diag(V, k=-1)

# corner elements of the Hamiltonian
H[0, -1] = v
H[-1, 0] = v

# time array
time_array = np.arange(0, T, dt)

# site array
site_array = np.arange(N)

# initial state
psi_0 = np.zeros((N), dtype=complex)
psi_0[l] = 1. + 0.j

#initialization of the state array
Psi = np.zeros((len(time_array), N), dtype=complex)
Psi[0,:] = psi_0 

# replicas 1D array
replicas = np.arange(0, num_replicas)

# random 2D array
dW = np.random.normal(mu, 1.0, (len(time_array), num_replicas, N)) * np.sqrt(dt)

def Euler(replica):
    psi_0 = np.zeros((N), dtype=complex)
    psi_0[l] = 1. + 0.j
    psi = psi_0
    for i in np.arange(1, len(time_array)):
        psi += -1.j * (H @ psi) * dt - 1.j * sigma * psi * dW[i,replica,:] - 0.5 * (sigma**2) * psi * dt
        psi /= np.sqrt(psi @ np.conj(psi))
        Psi[i,:] = psi
    return Psi

pool = Pool(processes)
Psi = pool.map(Euler, replicas)

Psi = np.asarray(Psi)

Psi = np.swapaxes(Psi,0,1)

print(Psi)

Empirically I found that if num_replicas > 4 * processes as expressed in the pool.map function, it seems that two processes take the same argument, as if the same calculation is repeated two times. Instead, from 'num_replicas <= 4*processes` I get the expected result: each process is different from the others.

This is not due to the generation of the random matrix dW, since each row is uncorrelated, so I ascribe this behavior to my use of multiprocessing.pool.

Upvotes: 1

Views: 338

Answers (2)

Sam Mason
Sam Mason

Reputation: 16174

as @Fabrizio pointed out, Psi is shared between invocations of Euler. this is generally a bad thing to do and another example of why it's a bad idea to have "global mutable state". it's just too easy for things to break in unexpected ways

the reason it fails in this case is subtle and due to the way Pool.map accumulates several results in each process before pickling them and returning them to the parent/controlling process. you can see this by setting the chunksize parameter of map to 1, causing it return results immediately and hence not overwriting intermediate results

it's equivalent to the following minimal working example:

from multiprocessing import Pool

arr = [None]

def mutate_global(i):
    arr[0] = i
    return arr

with Pool(2) as pool:
    out = pool.map(mutate_global, range(10), chunksize=5)

print(out)

the last time I ran this I got:

[[4], [4], [4], [4], [4], [9], [9], [9], [9], [9]]

you can change the chunksize parameter to get an idea of what it's doing, or maybe run with the following version:

def mutate_local(i):
    arr = [None]
    arr[0] = i
    return arr

which "just works", and is the equlivelant to doing what @Fabrizio describes where you create Phi inside Euler rather than using a single global variable

Upvotes: 2

Fabrizio
Fabrizio

Reputation: 939

I think you should initialize your psi_0 and "Psi" inside the Euler function. I tried to reproduce your results and, indeed, I found that when num_replicas > 4 * processes you get the same results from multiple processors. But I think this is due to the fact that Psi, in your case, it's a global variable.

Modifying the code as following it gives different results for each num_replicas (by the way, why do you use site_array? It is not used anywhere).

import numpy as np 
from multiprocessing import Pool

# parameters
N = 3              # number of sites
T = 1              # total time
dt = 0.1             # time step
l = 0                # initially localized state on site l
e = 0.0              # site energy
v = 1.0              # hopping coefficient
mu, sigma = 0, 1.0   # average and variance of the gaussian distribution
num_replicas = 10    # number of replicas of the system
processes=2          # number of processes

# identity vector which represents the diagonal of the Hamiltonian
E = np.ones(N) * e

# vector which represents the upper/lower diagonal terms of the Hopping Matrix and the Hamiltonian
V = np.ones(N-1) * v

# definition of the tight-binding Hamiltonian (tridiagonal)
H = np.diag(E) + np.diag(V, k=1) + np.diag(V, k=-1)

# corner elements of the Hamiltonian
H[0, -1] = v
H[-1, 0] = v

# time array
time_array = np.arange(0, T, dt)

## site array
#site_array = np.arange(N)

# replicas 1D array
replicas = np.arange(0, num_replicas)

# random 2D array
dW = np.random.normal(mu, 1.0, (len(time_array), num_replicas, N)) * np.sqrt(dt)
#dW = np.random.normal(mu, 1.0, (len(time_array), num_replicas, N)) * np.sqrt(dt)

def Euler(replica):
    # initial state
    psi_0 = np.zeros((N), dtype=complex)
    psi_0[l] = 1. + 0.j

    #initialization of the state array
    Psi = np.zeros((len(time_array), N), dtype=complex)
    Psi[0,:] = psi_0 
    psi_0 = np.zeros((N), dtype=complex)
    psi_0[l] = 1. + 0.j
    psi = psi_0

#    print(dW[0,replica,0])
    for i in np.arange(1, len(time_array)):
        psi += -1.j * (H @ psi) * dt - 1.j * sigma * psi * dW[i,replica,:] - 0.5 * (sigma**2) * psi * dt
        psi /= np.sqrt(psi @ np.conj(psi))
        Psi[i,:] = psi
    return Psi


pool = Pool(processes)
Psi = pool.map(Euler, replicas)

Psi = np.asarray(Psi)

Psi = np.swapaxes(Psi,0,1)

print(Psi)

Upvotes: 1

Related Questions