NoviceCoder
NoviceCoder

Reputation: 474

How to use multi-threading to speed up nested for loop calculation?

I'm trying to perform numerical integration on a large array and the computation takes a very long time. I tried to speed up my code by using numba and the jit decorator, but numpy.trapz isn't supported.

My new idea would be to create n-many threads to run the calculations in parallel, but I was wondering how I could do this or if it was even feasible?

Referencing Below Code

Can I make sz[2] many threads to run at the same time that calls ZO_SteadState to calculate values?

    for i in range(sz[1]):
        phii = phi[i]
        for j in range(sz[2]):
            s = tau[0, i, j, :].reshape(1, n4)
            [R3, PHI3, S3] = meshgrid(rprime, phiprime, s)
            BCoeff = Bessel0(bm * R3)

            SS[0, i, j] = ZO_SteadyState(alpha, b,bm,BCoeff,Bessel_Denom, k2,maxt,phii, PHI2, PHI3, phiprime,R3,rprime,s,S3, T,v)

The calculation in question.

@jit()
def ZO_SteadyState(alpha, b,bm,BCoeff,Bessel_Denom, k2,maxt,phii, PHI2, PHI3, phiprime,R3,rprime,s,S3, T,v):
    g = 1000000 * exp(-(10 ** 5) * (R3 - (b / maxt) * S3) ** 2) * (
            exp(-(10 ** 5) * (PHI3 - 0) ** 2) + exp(-(10 ** 5) * (PHI3 - 2 * np.pi) ** 2) + exp(
        -(10 ** 5) * (PHI3 - 2 * np.pi / 3) ** 2) + exp(
        -(10 ** 5) * (PHI3 - 4 * np.pi / 3) ** 2))  # stationary point heat source.

    y = R3 * ((np.sqrt(2) / b) * (1 / (np.sqrt((H2 ** 2 / bm ** 2) + (1 - (v ** 2 / (bm ** 2 * b ** 2))))))
              * (BCoeff / Bessel_Denom)) * np.cos(v * (phii - PHI3)) * g

    x = (np.trapz(y, phiprime, axis=0)).reshape(1, 31, 300)

    # integral transform of heat source. integral over y-axis
    gbarbar = np.trapz(x, rprime, axis=1)

    PHI2 = np.meshgrid(phiprime, s)[0]

    sz2 = PHI2.shape
    f = h2 * 37 * Array_Ones((sz2[0], sz[1]))  # boundary condition.

    fbar = np.trapz(np.cos(v * (phii - PHI2)) * f, phiprime, 1).reshape(1, n4)  # integrate over y

    A = (alpha / k) * gbarbar + ((np.sqrt(2) * alpha) / k2) * (
                1 / (np.sqrt((H2 ** 2 / bm ** 2) + (1 - (v ** 2 / (bm ** 2 * b ** 2)))))) * fbar

    return np.trapz(exp(-alpha * bm ** 2 * (T[0, i, j] - s)) * A, s)

Upvotes: 2

Views: 1993

Answers (2)

Slowpoke
Slowpoke

Reputation: 1079

Another concept implementation, with processes spawning processes (EDIT: jit tested):

import numpy as np

# better pickling
import pathos 
from contextlib import closing


from numba import jit

#https://stackoverflow.com/questions/47574860/python-pathos-process-pool-non-daemonic
import multiprocess.context as context
class NoDaemonProcess(context.Process):
    def _get_daemon(self):
        return False
    def _set_daemon(self, value):
        pass
    daemon = property(_get_daemon, _set_daemon)

class NoDaemonPool(pathos.multiprocessing.Pool):
    def Process(self, *args, **kwds):
        return NoDaemonProcess(*args, **kwds)




# matrix dimensions
x = 100 # i
y = 500 # j

NUM_PROCESSES = 10 # total NUM_PROCESSES*NUM_PROCESSES will be spawned

SS = np.zeros([x, y], dtype=float)

@jit
def foo(i):
    return (i*i + 1)
@jit
def bar(phii, j):
    return phii*(j+1)

# The code which is implemented down here:
'''
for i in range(x):
    phii = foo(i)
    for j in range(y):
        SS[i, j] = bar(phii, j)
'''

# Threaded version:
# queue is in global scope


def outer_loop(i):

    phii = foo(i)

    # i is in process scope
    def inner_loop(j):
        result = bar(phii,j)
        # the data is coordinates and result
        return (i, j, result)


    with closing(NoDaemonPool(processes=NUM_PROCESSES)) as pool:
        res = list(pool.imap(inner_loop, range(y)))
    return res

with closing(NoDaemonPool(processes=NUM_PROCESSES)) as pool:    
    results = list(pool.imap(outer_loop, range(x)))

result_list = []
for r in results:
    result_list += r


# read results from queue
for res in result_list:
    if res:
        i, j, val = res
        SS[i,j] = val


# check that all cells filled
print(np.count_nonzero(SS)) # 100*500

EDIT: Explanation.

The reason of all the complications in this code is that I wanted to do more parallelization than OP asked for. If only inner loop is parallelized, then the outer loop remains, so for each iteration of outer loop new process pool is created and computations for inner loop are performed. As long, as it seemed for me, that formula does not depend on other iterations of outer loop, I decided to parallelize everything: now computations for outer loop are assigned to processes from the pool, after that each of the "outer-loop" processes creates its own new pool, and additional processes are spawned to perform computations for inner loop.

I might be wrong though and outer loop must not be parallelized; In this case you can leave only inner process pool.

Using process pools might be not optimal solutions though as time will be consumed on creation and destruction of pools. More efficient (but requiring mode handwork) solution will be to instantiate N processes once and for all, and then feed data into them and receive results using multiprocessing Queue(). So you should test first whether this multiprocessing solution gives you enough speedup (This will happen if time on constructing and destructing pools is small in comparison to Z0_SteadyState run).

The next complication, is that artificial no-daemon pool. Daemon process is used to gracefully stop application: when main program exits, daemon processes are terminated silently. However, daemon process can not spawn child processes. Here in your example you need to wait until each process ends to retrieve data, so I made them non-daemon to allow spawning child processes to compute inner loop.

Data exchange: I suppose that the amount of data which is needed to fill matrix and time to do it is small in comparison to actual computations. So I use pools and pool.imap function (which is a bit faster than .map(). You can also try .imap_unordered(), however in your case it should not make significant difference). Thus inner pool waits until all the results are computed and returns them as list. The outer pool thus returns list of lists which must be concatenated. Then the matrix is reconstructed from these results in single fast loop.

Notice with closing() thing: it closes pool automatically after things under this statement are finished, avoiding memory consumption by zombie processes.

Also, you might notice that I weirdly defined one function inside another, and inside processes I have access to some variables which have not been passed there: i, phii. This happens because processes have access to the global scope from which they were launched with copy-on-change policy (default fork mode). This does not involve pickling and is fast.

The last comment is about using pathos library instead of standard multiprocessing, concurrent.futures, subprocess, etc. The reason is that pathos has better pickling library used, so it can serialize functions which standard libraries can't (for example, lambda functions). I don't know about your function, so I used more powerful tool to avoid further problems.

And the very last thing: multiprocessing vs threading. You can change pathos processing pool to, say, standard ThreadPoolExecutor from concurrent.futures, as I did on the beginning when I just started that code. But, during execution, on my system CPU is loaded only on 100% (i.e. one core is utilized, it appears like all 8 cores are loaded at 15-20%). I am not that skilled to understand differences between threads and processes, but it seems for me, that processes allow to utilize all cores (100% each, 800% total).

Upvotes: 1

CJR
CJR

Reputation: 3985

This is the overall idea that I'd probably do. There's not enough context to give you a more reliable example. You'll have to set all your variables into the class.

import multiprocessing

pool = multiprocessing.Pool(processes=12)
runner = mp_Z0(variable=variable, variable2=variable2)

for i, j, v in pool.imap(runner.run, range(sz[1]):
    SS[0, i, j] = v


class mp_Z0:

    def __init__(self, **kwargs):
        for k, v in kwargs:
            setattr(self, k, v)


    def run(self, i):
        phii = self.phi[i]
        for j in range(self.sz[2]):
            s = self.tau[0, i, j, :].reshape(1, self.n4)
            [R3, PHI3, S3] = meshgrid(self.rprime, self.phiprime, s)
            BCoeff = Bessel0(self.bm * R3)

            return (i, j, ZO_SteadyState(self.alpha, self.b, self.bm, BCoeff, Bessel_Denom, self.k2, self.maxt, phii, self.PHI2, PHI3, self.phiprime, R3, self.rprime, self.s, S3, self.T, self.v))

This is an example (assuming everything is in the local namespace) of doing it without classes:

import multiprocessing

pool = multiprocessing.Pool(processes=12)    
def runner_function(i):
        phii = phi[i]
        for j in range(sz[2]):
            s = tau[0, i, j, :].reshape(1, n4)
            [R3, PHI3, S3] = meshgrid(rprime, phiprime, s)
            BCoeff = Bessel0(bm * R3)

            return (i, j, ZO_SteadyState(alpha, b, bm, BCoeff, Bessel_Denom, k2, maxt, phii, PHI2, PHI3,
                                         phiprime, R3, rprime, s, S3, T, v))

for i, j, v in pool.imap(runner_function, range(sz[1]):
    SS[0, i, j] = v   

Upvotes: 0

Related Questions