Reputation: 474
I'm trying to perform numerical integration on a large array and the computation takes a very long time. I tried to speed up my code by using numba and the jit decorator, but numpy.trapz isn't supported.
My new idea would be to create n-many threads to run the calculations in parallel, but I was wondering how I could do this or if it was even feasible?
Referencing Below Code
Can I make sz[2] many threads to run at the same time that calls ZO_SteadState to calculate values?
for i in range(sz[1]):
phii = phi[i]
for j in range(sz[2]):
s = tau[0, i, j, :].reshape(1, n4)
[R3, PHI3, S3] = meshgrid(rprime, phiprime, s)
BCoeff = Bessel0(bm * R3)
SS[0, i, j] = ZO_SteadyState(alpha, b,bm,BCoeff,Bessel_Denom, k2,maxt,phii, PHI2, PHI3, phiprime,R3,rprime,s,S3, T,v)
The calculation in question.
@jit()
def ZO_SteadyState(alpha, b,bm,BCoeff,Bessel_Denom, k2,maxt,phii, PHI2, PHI3, phiprime,R3,rprime,s,S3, T,v):
g = 1000000 * exp(-(10 ** 5) * (R3 - (b / maxt) * S3) ** 2) * (
exp(-(10 ** 5) * (PHI3 - 0) ** 2) + exp(-(10 ** 5) * (PHI3 - 2 * np.pi) ** 2) + exp(
-(10 ** 5) * (PHI3 - 2 * np.pi / 3) ** 2) + exp(
-(10 ** 5) * (PHI3 - 4 * np.pi / 3) ** 2)) # stationary point heat source.
y = R3 * ((np.sqrt(2) / b) * (1 / (np.sqrt((H2 ** 2 / bm ** 2) + (1 - (v ** 2 / (bm ** 2 * b ** 2))))))
* (BCoeff / Bessel_Denom)) * np.cos(v * (phii - PHI3)) * g
x = (np.trapz(y, phiprime, axis=0)).reshape(1, 31, 300)
# integral transform of heat source. integral over y-axis
gbarbar = np.trapz(x, rprime, axis=1)
PHI2 = np.meshgrid(phiprime, s)[0]
sz2 = PHI2.shape
f = h2 * 37 * Array_Ones((sz2[0], sz[1])) # boundary condition.
fbar = np.trapz(np.cos(v * (phii - PHI2)) * f, phiprime, 1).reshape(1, n4) # integrate over y
A = (alpha / k) * gbarbar + ((np.sqrt(2) * alpha) / k2) * (
1 / (np.sqrt((H2 ** 2 / bm ** 2) + (1 - (v ** 2 / (bm ** 2 * b ** 2)))))) * fbar
return np.trapz(exp(-alpha * bm ** 2 * (T[0, i, j] - s)) * A, s)
Upvotes: 2
Views: 1993
Reputation: 1079
Another concept implementation, with processes spawning processes (EDIT: jit tested):
import numpy as np
# better pickling
import pathos
from contextlib import closing
from numba import jit
#https://stackoverflow.com/questions/47574860/python-pathos-process-pool-non-daemonic
import multiprocess.context as context
class NoDaemonProcess(context.Process):
def _get_daemon(self):
return False
def _set_daemon(self, value):
pass
daemon = property(_get_daemon, _set_daemon)
class NoDaemonPool(pathos.multiprocessing.Pool):
def Process(self, *args, **kwds):
return NoDaemonProcess(*args, **kwds)
# matrix dimensions
x = 100 # i
y = 500 # j
NUM_PROCESSES = 10 # total NUM_PROCESSES*NUM_PROCESSES will be spawned
SS = np.zeros([x, y], dtype=float)
@jit
def foo(i):
return (i*i + 1)
@jit
def bar(phii, j):
return phii*(j+1)
# The code which is implemented down here:
'''
for i in range(x):
phii = foo(i)
for j in range(y):
SS[i, j] = bar(phii, j)
'''
# Threaded version:
# queue is in global scope
def outer_loop(i):
phii = foo(i)
# i is in process scope
def inner_loop(j):
result = bar(phii,j)
# the data is coordinates and result
return (i, j, result)
with closing(NoDaemonPool(processes=NUM_PROCESSES)) as pool:
res = list(pool.imap(inner_loop, range(y)))
return res
with closing(NoDaemonPool(processes=NUM_PROCESSES)) as pool:
results = list(pool.imap(outer_loop, range(x)))
result_list = []
for r in results:
result_list += r
# read results from queue
for res in result_list:
if res:
i, j, val = res
SS[i,j] = val
# check that all cells filled
print(np.count_nonzero(SS)) # 100*500
EDIT: Explanation.
The reason of all the complications in this code is that I wanted to do more parallelization than OP asked for. If only inner loop is parallelized, then the outer loop remains, so for each iteration of outer loop new process pool is created and computations for inner loop are performed. As long, as it seemed for me, that formula does not depend on other iterations of outer loop, I decided to parallelize everything: now computations for outer loop are assigned to processes from the pool, after that each of the "outer-loop" processes creates its own new pool, and additional processes are spawned to perform computations for inner loop.
I might be wrong though and outer loop must not be parallelized; In this case you can leave only inner process pool.
Using process pools might be not optimal solutions though as time will be consumed on creation and destruction of pools. More efficient (but requiring mode handwork) solution will be to instantiate N processes once and for all, and then feed data into them and receive results using multiprocessing Queue(). So you should test first whether this multiprocessing solution gives you enough speedup (This will happen if time on constructing and destructing pools is small in comparison to Z0_SteadyState
run).
The next complication, is that artificial no-daemon pool. Daemon process is used to gracefully stop application: when main program exits, daemon processes are terminated silently. However, daemon process can not spawn child processes. Here in your example you need to wait until each process ends to retrieve data, so I made them non-daemon to allow spawning child processes to compute inner loop.
Data exchange: I suppose that the amount of data which is needed to fill matrix and time to do it is small in comparison to actual computations. So I use pools and pool.imap
function (which is a bit faster than .map()
. You can also try .imap_unordered()
, however in your case it should not make significant difference). Thus inner pool waits until all the results are computed and returns them as list. The outer pool thus returns list of lists which must be concatenated. Then the matrix is reconstructed from these results in single fast loop.
Notice with closing()
thing: it closes pool automatically after things under this statement are finished, avoiding memory consumption by zombie processes.
Also, you might notice that I weirdly defined one function inside another, and inside processes I have access to some variables which have not been passed there: i
, phii
. This happens because processes have access to the global scope from which they were launched with copy-on-change
policy (default fork
mode). This does not involve pickling and is fast.
The last comment is about using pathos
library instead of standard multiprocessing
, concurrent.futures
, subprocess
, etc. The reason is that pathos
has better pickling library used, so it can serialize functions which standard libraries can't (for example, lambda functions). I don't know about your function, so I used more powerful tool to avoid further problems.
And the very last thing: multiprocessing vs threading. You can change pathos
processing pool to, say, standard ThreadPoolExecutor
from concurrent.futures
, as I did on the beginning when I just started that code. But, during execution, on my system CPU is loaded only on 100% (i.e. one core is utilized, it appears like all 8 cores are loaded at 15-20%). I am not that skilled to understand differences between threads and processes, but it seems for me, that processes allow to utilize all cores (100% each, 800% total).
Upvotes: 1
Reputation: 3985
This is the overall idea that I'd probably do. There's not enough context to give you a more reliable example. You'll have to set all your variables into the class.
import multiprocessing
pool = multiprocessing.Pool(processes=12)
runner = mp_Z0(variable=variable, variable2=variable2)
for i, j, v in pool.imap(runner.run, range(sz[1]):
SS[0, i, j] = v
class mp_Z0:
def __init__(self, **kwargs):
for k, v in kwargs:
setattr(self, k, v)
def run(self, i):
phii = self.phi[i]
for j in range(self.sz[2]):
s = self.tau[0, i, j, :].reshape(1, self.n4)
[R3, PHI3, S3] = meshgrid(self.rprime, self.phiprime, s)
BCoeff = Bessel0(self.bm * R3)
return (i, j, ZO_SteadyState(self.alpha, self.b, self.bm, BCoeff, Bessel_Denom, self.k2, self.maxt, phii, self.PHI2, PHI3, self.phiprime, R3, self.rprime, self.s, S3, self.T, self.v))
This is an example (assuming everything is in the local namespace) of doing it without classes:
import multiprocessing
pool = multiprocessing.Pool(processes=12)
def runner_function(i):
phii = phi[i]
for j in range(sz[2]):
s = tau[0, i, j, :].reshape(1, n4)
[R3, PHI3, S3] = meshgrid(rprime, phiprime, s)
BCoeff = Bessel0(bm * R3)
return (i, j, ZO_SteadyState(alpha, b, bm, BCoeff, Bessel_Denom, k2, maxt, phii, PHI2, PHI3,
phiprime, R3, rprime, s, S3, T, v))
for i, j, v in pool.imap(runner_function, range(sz[1]):
SS[0, i, j] = v
Upvotes: 0