New Developer
New Developer

Reputation: 89

multprocessing for a stochastic process with multiple arguments

I want to solve a stochastic differential equation using multiprocessing. A simplified not-parallel code is like:

import numpy as np

x       = np.zeros((2, 3, 4))                     #matrix
z       = np.random.normal(0, 1, (2,3,4))         #noise
z_array = z
for i in range(2):
    for j in range(3):
        x[i,j,0] = i
        for k in range(3):
            x[i,j,k+1] = x[i,j,k]*z_array[i,j,k]

The outcomes are the noisez_array and the corresponding matrix x. I want to use multiprocessing for the second loop. The problem is that I don't know how to incorporate the noise z in the parallel code. A naive implementation is like

import os
import numpy     as np
import functools as ft
from multiprocess import Pool 

def fun(i, k):
    x = np.zeros(4)
    x[0] = i
    for k in range(2):
        z = np.random.normal(0, 1)
        x[k+1] = x[k]*z
    return x

if __name__=='__main__':
    pool  = Pool(os.cpu_count()-1)
    x   = np.zeros((2, 3, 4))
    for i in range(2):
        result = np.array(pool.map(ft.partial(fun, i), range(3)))
        x[i] = result           
    pool.close()
    pool.join()

Since the code involves random numbers, I am not sure whether parallel code is correct or not and I don't know how to get the noises z. Any ideas?

Upvotes: 0

Views: 95

Answers (1)

stochastic13
stochastic13

Reputation: 423

You can try pre-generating the noise z and passing it to the argument along with k as a tuple. That way you have the noise with you and you do not need to generate it in the function. You can also add the first loop with i in the original function in the tuple to run it in the multiprocessing code.

For the code below:

  1. In the second code you wrote, you ran the k loop inside the fun as range(2), which I assume is a typo and I am keeping it till range(3) as in the original code
  2. I have incorporated the first loop into the multiprocessing setup too
  3. If memory is not an issue and the matrix is small, use the below option which is cleaner and the equivalency of your original code and multiprocessing code is easier to read. If memory is an issue, you can compute only smaller matrices inside the fun and then reshape the result rather than adding (let me know if you want that solution).

Main code:

import os
import numpy as np
from multiprocessing import Pool 

def fun(t):
    i, j, z = t
    x = np.zeros((2, 3, 4))
    x[i, j, 0] = i
    for k in range(3):
        x[i, j, k + 1] = x[i, j, k] * z[k] 
    return x


if __name__=='__main__':
    z = np.random.normal(0, 1, (2,3,4))
    pool  = Pool(os.cpu_count() - 1)
    map_args = ((i, j, z[i, j, :]) for i in range(2) for j in range (3))
    result = np.array(pool.map(fun, map_args))
    x = np.sum(result, 0)   
    pool.close()
    pool.join()

Upvotes: 1

Related Questions