Mike O'Connor
Mike O'Connor

Reputation: 2700

Poor man's Python data parallelism doesn't scale with the # of processors? Why?

I'm needing to make many computational iterations, say 100 or so, with a very complicated function that accepts a number of input parameters. Though the parameters will be varied, each iteration will take nearly the same amount of time to compute. I found this post by Muhammad Alkarouri. I modified his code so as to fork the script N times where I would plan to set N equal at least to the number of cores on my desktop computer.

It's an old MacPro 1,0 running 32-bit Ubuntu 16.04 with 18GB of RAM, and in none of the runs of the test file below does the RAM usage exceed about 15% (swap is never used). That's according to the System Monitor's Resources tab, on which it is also shown that when I try running 4 iterations in parallel all four CPUs are running at 100%, whereas if I only do one iteration only one CPU is 100% utilized and the other 3 are idling. (Here are the actual specs for the computer. And cat /proc/self/status shows a Cpus_allowed of 8, twice the total number of cpu cores, which may indicate hyper-threading.)

So I expected to find that the 4 simultaneous runs would consume only a little more time than one (and going forward I generally expected run times to pretty much scale in inverse proportion to the number of cores on any computer). However, I found to the contrary that instead of the running time for 4 being not much more than the running time for one, it is instead a bit more than twice the running time for one. For example, with the scheme presented below, a sample "time(1) real" value for a single iteration is 0m7.695s whereas for 4 "simultaneous" iterations it is 0m17.733s. And when I go from running 4 iterations at once to 8, the running time scales in proportion.

So my question is, why doesn't it scale as I had supposed (and can anything be done to fix that)? This is, by the way, for deployment on a few desktops; it doesn't have to scale or run on Windows.

Also, I have for now neglected the multiprocessing.Pool() alternative as my function was rejected as not being pickleable.

Here is the modified Alkarouri script, multifork.py:

#!/usr/bin/env python
import os, cPickle, time

import numpy as np
np.seterr(all='raise')

def function(x):
    print '... the parameter is ', x
    arr = np.zeros(5000).reshape(-1,1)
    for r in range(3):
        for i in range(200):
            arr = np.concatenate( (arr, np.log( np.arange(2, 5002) ).reshape(-1,1) ), axis=1 )
    return { 'junk': arr, 'shape': arr.shape }

def run_in_separate_process(func, *args, **kwds):
    numruns = 4
    result = [ None ] * numruns
    pread = [ None ] * numruns
    pwrite = [ None ] * numruns
    pid = [ None ] * numruns
    for i in range(numruns):
        pread[i], pwrite[i] = os.pipe()
        pid[i] = os.fork()
        if pid[i] > 0:
            pass
        else: 
            os.close(pread[i])
            result[i] = func(*args, **kwds)
            with os.fdopen(pwrite[i], 'wb') as f:
                cPickle.dump((0,result[i]), f, cPickle.HIGHEST_PROTOCOL)
            os._exit(0)

    #time.sleep(17)
    while True:
        for i in range(numruns):
            os.close(pwrite[i])
            with os.fdopen(pread[i], 'rb') as f:
                stat, res = cPickle.load(f)
                result[i] = res
            #os.waitpid(pid[i], 0)
        if not None in result: break
    return result

def main():
    print 'Running multifork.py.'

    print run_in_separate_process( function, 3 )

if __name__ == "__main__":
    main()

With multifork.py, uncommenting os.waitpid(pid[i], 0) has no effect. And neither does uncommenting time.sleep() if 4 iterations are being run at once and if the delay is not set to more than about 17 seconds. Given that time(1) real is something like 0m17.733s for 4 iterations done at once, I take that to be an indication that the While True loop is not itself a cause of any appreciable inefficiency (due to the processes all taking the same amount of time) and that the 17 seconds are indeed being consumed solely by the child processes.

Out of a profound sense of mercy I have spared you for now my other scheme, with which I employed subprocess.Popen() in lieu of os.fork(). With that one I had to send the function to an auxiliary script, the script that defines the command that is the first argument of Popen(), via a file. I did however use the same While True loop. And the results? They were the same as with the simpler scheme that I am presenting here--- almost exactly.

Upvotes: 2

Views: 896

Answers (2)

frist
frist

Reputation: 1958

Why don't you use joblib.Parallel feature?

#!/usr/bin/env python

from joblib import Parallel, delayed

import numpy as np
np.seterr(all='raise')

def function(x):
    print '... the parameter is ', x
    arr = np.zeros(5000).reshape(-1,1)
    for r in range(3):
        for i in range(200):
            arr = np.concatenate( (arr, np.log( np.arange(2, 5002) ).reshape(-1,1) ), axis=1 )
    return { 'junk': arr, 'shape': arr.shape }

def main():
    print 'Running multifork.py.'

    print Parallel(n_jobs=2)(delayed(function)(3) for _ in xrange(4))

if __name__ == "__main__":
    main()

It seems that you have some bottleneck in your computations.

On your example you passes your data over the pipe which is not very fast method. To avoid this performance problem you should use shared memory. This is how multiprocessing and joblib.Parallel work.

Also you should remember that in single-thread case you don't have to serialize and deserialize the data but in case of multiprocess you have to.

Next, even if you have 8 cores it could be hyper-threading feature enabled with divides core performance into 2 threads, thus if you have 4 HW cores the OS things that there are 8 of them. There are a lot of advantages and disadvantages to use HT but the main thing is if you going to load all your cores to do some computations for a long time then you should disable it.

For example, I have a Intel(R) Core(TM) i3-2100 CPU @ 3.10GHz with 2 HW cores and HT enabled. So in top I saw 4 cores. The time of computations for me are:

  • n_jobs=1 - 0:07.13
  • n_jobs=2 - 0:06.54
  • n_jobs=4 - 0:07.45

This is how my lscpu looks like:

Architecture:          x86_64
CPU op-mode(s):        32-bit, 64-bit
Byte Order:            Little Endian
CPU(s):                4
On-line CPU(s) list:   0-3
Thread(s) per core:    2
Core(s) per socket:    2
Socket(s):             1
NUMA node(s):          1
Vendor ID:             GenuineIntel
CPU family:            6
Model:                 42
Stepping:              7
CPU MHz:               1600.000
BogoMIPS:              6186.10
Virtualization:        VT-x
L1d cache:             32K
L1i cache:             32K
L2 cache:              256K
L3 cache:              3072K
NUMA node0 CPU(s):     0-3

Note at Thread(s) per core line.

So in your example there are not so many computations rather that data transfering. You application doesn't have time to get the advantages of parallelism. If you will have a long computation job (about 10 minutes) I think you will get it.

ADDITION:

I've taken a look at your function more detailed. I've replaced multiple execution with just one execution of function(3) function and run it within profiler:

$ /usr/bin/time -v python -m cProfile so.py

Output if quite long, you can view full version here (http://pastebin.com/qLBBH5zU). But the main thing is that the program lives most of the time in numpy.concatenate function. You can see it:

ncalls  tottime  percall  cumtime  percall filename:lineno(function)
.........
600    1.375    0.002    1.375    0.002 {numpy.core.multiarray.concatenate}
.........
Elapsed (wall clock) time (h:mm:ss or m:ss): 0:01.64
.........

If you will run multiple instances of this program you will see that the time increases very much as the execution time of individual program instance. I've started 2 copy simultaneously:

$ /usr/bin/time -v python -m cProfile prog.py & /usr/bin/time -v python -m cProfile prog.py &

On the other hand I wrote a small fibo function:

def fibo(x):
    arr = (0, 1)
    for _ in xrange(x):
        arr = (arr[-1], sum(arr))
    return arr[0]

And replace the concatinate line with fibo(10000). In this case the execution time of single-instanced program is 0:22.82 while the execution time of two instances takes almost the same time per instance (0:24.62).

Based on this I think perhaps numpy uses some shared resource which leads to parallization problem. Or it can be numpy or scipy -specific issue.

And the last thing about the code, you have to replace the block below:

for r in range(3):
    for i in range(200):
        arr = np.concatenate( (arr, np.log( np.arange(2, 5002) ).reshape(-1,1) ), axis=1 )

With the only line:

arr = np.concatenate( (arr, np.log(np.arange(2, 5002).repeat(3*200).reshape(-1,3*200))), axis=1 )

Upvotes: 2

Mike O'Connor
Mike O'Connor

Reputation: 2700

I am providing an answer here so as to not leave such a muddled record. First, I can report that most-serene frist's joblib code works, and note that it's shorter and that it also doesn't suffer from the limitations of the While True loop of my example, which only works efficiently with jobs that each take about the same amount of time. I see that the joblib project has current support and if you don't mind dependency upon a third-party library it may be an excellent solution. I may adopt it.

But, with my test function the time(1) real, the running time using the time wrapper, is about the same with either joblib or my poor man's code.

To answer the question of why the scaling of the running time in inverse proportion to the number of physical cores doesn't go as hoped, I had diligently prepared my test code that I presented here so as to produce similar output and to take about as long to run as the code of my actual project, per single-run without parallelism (and my actual project is likewise CPU-bound, not I/O-bound). I did so in order to test before undertaking the not-really-easy fitting of the simple code into my rather complicated project. But I have to report that in spite of that similarity, the results were much better with my actual project. I was surprised to see that I did get the sought inverse scaling of running time with the number of physical cores, more or less.

So I'm left supposing--- here's my tentative answer to my own question--- that perhaps the OS scheduler is fickle and very sensitive to the type of job. And there could be effects due to other processes that may be running even if, as in my case, the other processes are hardly using any CPU time (I checked; they weren't).

Tip #1: Never name your joblib test code joblib.py (you'll get an import error). Tip #2: Never rename your joblib.py file test code file and run the renamed file without deleting the joblib.py file (you'll get an import error).

Upvotes: 1

Related Questions