S. Finch
S. Finch

Reputation: 21

python joblib & random walk - a performance of [CONCURRENT]-process scheduling

Here is my python-3.6 code for simulating a 1D reflected random walk, using the joblib module to generate 400 realizations concurrently across K workers on a Linux cluster machine.

I note, however, that the runtime for K=3 is worse than for K=1, and that the runtime for K=5 is even worse!

Can anyone please see a way to improve my use of joblib?

from math import sqrt
import numpy as np
import joblib as jl
import os

K = int(os.environ['SLURM_CPUS_PER_TASK'])

def f(j):
    N = 10**6
    p = 1/3
    np.random.seed(None)
    X = 2*np.random.binomial(1,p,N)-1   # X = 1 with probability p
    s = 0                               # X =-1 with probability 1-p
    m = 0 
    for t in range(0,N):
        s = max(0,s+X[t])
        m = max(m,s)
    return m

pool = jl.Parallel(n_jobs=K)
W = np.asarray(pool(jl.delayed(f)(j) for j in range(0,400)))
W      

Upvotes: 2

Views: 717

Answers (2)

user3666197
user3666197

Reputation: 1

a way to improve my use of joblib ?

joblib can help and will help, but only a code, which can benefit from distributed execution, split across some pool of resources if costs of doing so are less than an effective-speedup.

Tweaking the joblib's pre-load and batch size parameters only starts to make any sense only after the to-be-distributed code gets performance polished.

Some efforts on this, as demonstrated below, have shown the core-speedup of ~ 8x on achieving still pure-[SERIAL] run-times of ~ 217,000 [us] per one Random Walk ( instead of ~ 1,640,000 [us] per item, as was reported above ).

Only after this there may come some harder work on Cluster-resources related optimisation ( performance devastation avoidance efforts ) for the ab-definitio postulated intention to organise a distributed work-flow of the above defined 400 repetitions thereof.

That makes sense if and only if:

  • if possible to avoid CPU-starvation, and
  • if not having to pay any excessive add-on costs of distributed job-scheduling.

Perhaps a long, but important story about where the performance gets saved or lost:

Executive summary

There could be hardly a better reward to Dr. Gene AMDAHL's argument.

The inner structure of the above defined task is heavily [SERIAL]:

  • random number genesis is principally a pure-[SERIAL] process, due to PRNG-design
  • 1E6 iterator atop a 1D-vector of pre-calc'd drunken-sailor-steps is pure-[SERIAL]

Yes, the "outer"-scope-of-work ( the 400 repetitions of the same process ) can be easily converted to a "just"-[CONCURRENT] ( not a true-[PARALLEL], even if professors and wannabe gurus try to tell you ) process scheduling, but the add-on costs of doing so get worse than linearly added to the run-time, and given the [SERIAL] part was not performance re-engineered, the net-effect of such efforts could easily devastate the original good intentions ( Q.E.D. above, as the posted run-times grew up, from 10:52, for K == 1, towards ~ 13 minutes for even small number of K-s ).


A brief testing has proved the whole task could be, after using standard python tools, run in a pure-[SERIAL] fashion under < 1.45 [s] ( instead of the reported ~ 12 - 13 minutes ) even on a rather stone-age archaic desktop device ( some in-cache computing effect were possible, yet more as an accidental side-effect, than an intentional HPC-motivated code-refactoring for an HPC-cluster specific performance ):

u@amd64FX:~$ lstopo --of ascii
+-----------------------------------------------------------------+
| Machine (7969MB)                                                |
|                                                                 |
| +------------------------------------------------------------+  |
| | Package P#0                                                |  |
| |                                                            |  |
| | +--------------------------------------------------------+ |  |
| | | L3 (8192KB)                                            | |  |
| | +--------------------------------------------------------+ |  |
| |                                                            |  |
| | +--------------------------+  +--------------------------+ |  |
| | | L2 (2048KB)              |  | L2 (2048KB)              | |  |
| | +--------------------------+  +--------------------------+ |  |
| |                                                            |  |
| | +--------------------------+  +--------------------------+ |  |
| | | L1i (64KB)               |  | L1i (64KB)               | |  |
| | +--------------------------+  +--------------------------+ |  |
| |                                                            |  |
| | +------------++------------+  +------------++------------+ |  |
| | | L1d (16KB) || L1d (16KB) |  | L1d (16KB) || L1d (16KB) | |  |
| | +------------++------------+  +------------++------------+ |  |
| |                                                            |  |
| | +------------++------------+  +------------++------------+ |  |
| | | Core P#0   || Core P#1   |  | Core P#2   || Core P#3   | |  |
| | |            ||            |  |            ||            | |  |
| | | +--------+ || +--------+ |  | +--------+ || +--------+ | |  |
| | | | PU P#0 | || | PU P#1 | |  | | PU P#2 | || | PU P#3 | | |  |
| | | +--------+ || +--------+ |  | +--------+ || +--------+ | |  |
| | +------------++------------+  +------------++------------+ |  |
| +------------------------------------------------------------+  |
|                                                                 |
+-----------------------------------------------------------------+
+-----------------------------------------------------------------+
| Host: amd64FX                                                   |
| Date: Fri 15 Jun 2018 07:08:44 AM                               |
+-----------------------------------------------------------------+

< 1.45 [s]?
Why ? How ? That is the whole story about ...
( Due HPC efforts could make it further well under 1 [s] )


Dr. Gene AMDAHL's argument, even in his original, add-on overheads agnostic form, in his well cited report, was showing, that any composition of [SERIAL] and [PARALLEL] blocks of work will have a principally limited benefit from increasing amount of processing units harnessed for the [PARALLEL]-part ( a.k.a. A Law of Diminishing Returns, towards an asymptotically limited speedup for even an infinite number of processors ), whereas any improvement introduced for the [SERIAL]-part will continue to additively increase the speedup ( in a pure linear fashion ). Let me skip here the adverse effects ( also affecting the speedup, some in a similarly pure linear fashion, but in an adverse sense - the add-on overheads - as these will get discussed below ).

Step No. 1:
Repair the code, so as to make something useful.

Given the code as-is above, there is no random-walk at all.

Why ?

>>> [ op( np.random.binomial( 1, 1 /3,  1E9 ) ) for op in ( sum, min, max, len ) ]
[0, 0, 0, 1000000000]

So,
the code as-is produces a rather expensive list of a-priori known constants. No randomness at all. Damned python rounding of integer division. :o)

>>> [ op( np.random.binomial( 1, 1./3., 1E9 ) ) for op in ( sum, min, max, len ) ]
[333338430, 0, 1, 1000000000]

So this is fixed.


Step No. 2:
Understand the overheads ( and best also any hidden performance-blockers )

Any attempt to instantiate a distributed process ( for each one of the instructed K-amount joblib-spawned processes, calling a multiprocessing with the subprocess, not the threading, backend ) makes you pay a cost. Always...

Given that your code execution will get additional [SERIAL]-add-on code, that has to run, before any ... still just a theoretical ... ( 1 / n_jobs ) split-effect may start to take place.

A closer look onto the "useful" work:

def f( j ):                                         # T0
    #pass;   np.random.seed( None )                 #  +      ~ 250 [us]
    prnGEN = np.random.RandomState()                #  +      ~ 230 [us]
    # = 2 * np.random.binomial( 1, 1./3., 1E6 ) - 1 #  +  ~ 465,000 [us]
    X =        prnGEN.binomial( 1, 1./3., 1E6 )     #  +  ~ 393,000
    X*= 2                                           #  +    ~ 2.940
    X-= 1                                           #  +    ~ 2.940                                  
    s = 0; m = 0                                    #  +        ~ 3 [us]
    for t in range( 0, int( 1E6 ) ):                #     ( py3+ does not allocate range() but works as an xrange()-generator
        s = max( 0, s + X[t] )                      #  +       ~ 15 [us] cache-line friendly consecutive { hit | miss }-rulez here, heavily ...
        m = max( m, s )                             #  +       ~  5 [us]
    return  m                                       # = ~ 2,150,000 [us] @  i5/2.67 GHz
#                                                   # = ~ 1,002,250 [us] @ amd/3.6  GHz

For this sort of workpackage, the best demonstration-purpose speedups will be demonstrable from a non-interpreted, GIL-free, threading-backend, multiprocessing.Pool-spawned Cython-ised code-packages, cdef-ed with a nogil directive. May expect such code-execution under about = ~ 217,000 [us] per one pure-[SERIAL] Random Walk, with 1E6 steps, when it starts to make sense to harness the pool of code-execution nodes with some pre-load tweaking, so as not to let them starve. Yet, all premature-optimisation warnings are due and valid in this simplified context and proper engineering practices are to be used for achieving a professional-grade result.

Some tools may help you see, dow to the assembly level, how many instructions were actually added, by any respective high-level language syntax-constructor element ( or concurrency / parallelisation #pragma masquerades ) to "smell" these add-on processing-costs one will pay during the finalised code-execution:

enter image description here

Given these add-on processing costs, a "small"-(thin)-amount of work "inside" the "just"-concurrently executed ( Beware, not automatically a true [PARALLEL]-scheduling possible ), these add-on costs may make you pay a way more than you would receive by the split.

The blockers:

Any add-on communication / synchronisation may further devastate the theoretical Speedup flow-of-code-execution. Locks, GIL being avoided if not using the threading-backend, semaphores, socket-comms, sharing et al are the common blockers.

For a carefully designed source-of-randomness, any draw from such a "device" must also get centrally re-synchronised, so as to keep the quality of such randomness. This may cause additional trouble behind the curtain ( a common issue on systems with some authority certified sources of randomness ).


The next step?

Read more details on Amdahl's Law, best the contemporary re-formulated version, where both setup + termination overheads are added in the Overhead-strict mode and also an atomicity-of-processing was taken into account for practical evaluation of realistic speedup limitations

Next: measure your net-duration-costs of your code and you get indirectly the add-on costs of setup+termination overheads on your in-vivo system execution.

def f( j ):
    ts = time.time()
    #------------------------------------------------------<clock>-ed SECTION
    N = 10**6
    p = 1./3.
    np.random.seed( None )                    # RandomState coordination ...
    X = 2 * np.random.binomial( 1, p, N ) - 1 # X = 1 with probability p
    s = 0                                     # X =-1 with probability 1-p
    m = 0 
    for t in range( 0, N ):
        s = max( 0, s + X[t] )
        m = max( m, s )
    #------------------------------------------------------<clock>-ed SECTION
    return ( m, time.time() - ts )            # tuple

For classroom tutorials, I've successfully parallelized my random walk code using special modules within R, Matlab, Julia & Stata. (By "successfully", I mean it is abundantly clear that 20 workers do at least 10 times as much work as 1 worker in the same time interval.) Is such internal parallelization not feasible in Python ?

Well, the last comment seems to display some sort of inconvenience to people, who tried to help and who have brought in reasons, why the posted code, as-is, works as was observed. It was not our choice to define the processing strategy this exact way, was it?

So, again. Given the original decision was to use python-3.6 + joblib.Parallel() + joblib.delayed() instrumentation, simply Alea Iacta Est ...

What might have worked ( as cited ) for { R | MATLAB | Julia | Stata } simply does not mean that it will work the same way in GIL-stepped, the less joblib.Parallel()-spawned eco-systems.

The first cost one will ALWAYS pay for a joblib.Parallel()-spawned job is a cost of re-construction of a whole, 1:1 copy of the current python-interpreter state. Given that the current state contains a way more object-instances, than a stripped-down MCVE-code ( as was demonstrated for a MCVE-script as @rth demonstrated ), the whole, many-times-replicated memory-image first has to get copied + transported + re-constructed onto all distributed-processing nodes, accross the SLURM-managed cluster-footprint, which all costs add-on ( non-productive ) overhead time. If in doubts, add a few GB-sized numpy-arrays into the state of the python interpreter and put the measured timestamps for a respective duration calculations into the first and last array cells and finally return ( m, aFatArray ). The overall execution times will jump higher, as both the initial 1:1-copy and the returning path will have to move much larger amount of data there and back ( again, for details on instantiation-related add-on costs, there has been posted in many places here, including templates for systematic benchmarking of the respective add-on costs ).

Exactly this was the reason to advice the kind O/P to indeed do measure the effective amount of computing time ( the "benefit" part of the elementary cost/benefit argument ), which is both cheap to get in the trivialised experiment, which will show the scale, the sum, and the actual proportion of the useful-work inside the "remote"-executed efficient-computing-payload(s) ( ref. the code modification proposed above, that returns the values so that W[:][1] will tell the actual "net" computing costs of the "useful-work" spent during the efficient-workpackage-computing time, once these have got finally arrived and activated into the respective "remote" code-execution eco-system(s) ( here, in the form of the joblib.Parallel()-spawned binary full-scale replicas of the original python interpreter ) whereas a flow of time between a start and end of the main-code execution shows the actual expenses - here the lumpsum spent amount of time, i.e. including all the "remote"-process-instantiation(s) + all the respective workpackage(s)-distribution(s) + all the "remote"-process-termination.


A final remark for those who have not spend a time on reading about the source-of-Randomness-related problems :

Any good-practice shall rather avoid a blocking logic hidden behind the "shared-randomness". Better use individually configured PRNG-sources. If interested or in a need for a certifiable PRNG-robustness, feel free to read more on this in a discussion here

Upvotes: 7

rth
rth

Reputation: 3106

@user3666197 wrote a very nice answer about overhead, with a lot of bolded text ;) However, I want to draw your attention, that when you run your code with K=1, you do only one random walk. With K=3 or 5 you perform 3 or 5 random walks simultaneously (seems). So you need to multiply runtime of K=1 by 3 or 5 to get runtime which is required get the same job done. I guess this runtime will be much bigger than you got.

Well, to provide a useful answer, not just a note (OP is right in the comments). It seems a multiprocessing module is a better choice. Here your code

from math import sqrt
import numpy as np

from multiprocessing import Pool
import os

K = int(os.environ['NTASK'])


def f(j):
    N = 10**6
    p = 1./3.
    np.random.seed(None)
    X = 2*np.random.binomial(1,p,N)-1   # X = 1 with probability p
    s = 0                               # X =-1 with probability 1-p
    m = 0 
    for t in range(0,N):
        s = max(0,s+X[t])
        m = max(m,s)
    return m

pool = Pool(processes=K) 
print pool.map(f, xrange(40))

and the performance

$ time NTASK=1 python stof.py                                                                                                
[21, 19, 17, 17, 18, 16, 17, 17, 19, 19, 17, 16, 18, 16, 19, 22, 20, 18, 16, 17, 17, 16, 18, 18, 17, 17, 19, 17, 19, 19, 16, 16, 18, 17, 18, 18, 19, 20, 16, 19]

real    0m30.367s
user    0m30.064s
sys     0m 0.420s

$ time NTASK=2 python stof.py                                                                                                
[18, 16, 16, 17, 19, 17, 21, 18, 19, 21, 17, 16, 15, 25, 19, 16, 20, 17, 15, 19, 17, 16, 20, 17, 16, 16, 16, 16, 17, 23, 17, 16, 17, 17, 19, 16, 17, 16, 19, 18]

real    0m13.428s
user    0m26.184s
sys     0m 0.348s

$ time NTASK=3 python stof.py 
[18, 17, 16, 19, 17, 18, 20, 17, 21, 16, 16, 16, 16, 17, 22, 18, 17, 15, 17, 19, 18, 16, 15, 16, 16, 24, 20, 16, 16, 16, 22, 19, 17, 18, 18, 16, 16, 19, 17, 18]

real    0m11.946s
user    0m29.424s
sys     0m 0.308s

$ time NTASK=4 python stof.py
[16, 19, 17, 16, 19, 17, 17, 16, 18, 22, 16, 21, 16, 18, 15, 16, 20, 17, 22, 17, 16, 17, 17, 20, 22, 21, 17, 17, 16, 17, 19, 16, 19, 16, 16, 18, 25, 21, 19, 18]

real    0m 8.206s
user    0m26.580s
sys     0m 0.360s

Upvotes: 3

Related Questions