Pradeep Reddy Raamana
Pradeep Reddy Raamana

Reputation: 367

Why am I not seeing speed up via multiprocessing in Python?

I am trying to parallelize an embarrassingly parallel for loop (previously asked here) and settled on this implementation that fit my parameters:

    with Manager() as proxy_manager:
        shared_inputs = proxy_manager.list([datasets, train_size_common, feat_sel_size, train_perc,
                                            total_test_samples, num_classes, num_features, label_set,
                                            method_names, pos_class_index, out_results_dir, exhaustive_search])
        partial_func_holdout = partial(holdout_trial_compare_datasets, *shared_inputs)

        with Pool(processes=num_procs) as pool:
            cv_results = pool.map(partial_func_holdout, range(num_repetitions))

The reason I need to use a proxy object (shared between processes) is the first element in the shared proxy list datasets that is a list of large objects (each about 200-300MB). This datasets list usually has 5-25 elements. I typically need to run this program on a HPC cluster.

Here is the question, when I run this program with 32 processes and 50GB of memory (num_repetitions=200, with datasets being a list of 10 objects, each 250MB), I do not see a speedup even by factor of 16 (with 32 parallel processes). I do not understand why - any clues? Any obvious mistakes, or bad choices? Where can I improve this implementation? Any alternatives?

I am sure this has been discussed before, and the reasons can be varied and very specific to implementation - hence I request you to provide me your 2 cents. Thanks.

Update: I did some profiling with cProfile to get a better idea - here is some info, sorted by cumulative time.

In [19]: p.sort_stats('cumulative').print_stats(50)
Mon Oct 16 16:43:59 2017    profiling_log.txt

         555404 function calls (543552 primitive calls) in 662.201 seconds

   Ordered by: cumulative time
   List reduced from 4510 to 50 due to restriction <50>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
    897/1    0.044    0.000  662.202  662.202 {built-in method builtins.exec}
        1    0.000    0.000  662.202  662.202 test_rhst.py:2(<module>)
        1    0.001    0.001  661.341  661.341 test_rhst.py:70(test_chance_classifier_binary)
        1    0.000    0.000  661.336  661.336 /Users/Reddy/dev/neuropredict/neuropredict/rhst.py:677(run)
        4    0.000    0.000  661.233  165.308 /Users/Reddy/anaconda/envs/py36/lib/python3.6/threading.py:533(wait)
        4    0.000    0.000  661.233  165.308 /Users/Reddy/anaconda/envs/py36/lib/python3.6/threading.py:263(wait)
       23  661.233   28.749  661.233   28.749 {method 'acquire' of '_thread.lock' objects}
        1    0.000    0.000  661.233  661.233 /Users/Reddy/anaconda/envs/py36/lib/python3.6/multiprocessing/pool.py:261(map)
        1    0.000    0.000  661.233  661.233 /Users/Reddy/anaconda/envs/py36/lib/python3.6/multiprocessing/pool.py:637(get)
        1    0.000    0.000  661.233  661.233 /Users/Reddy/anaconda/envs/py36/lib/python3.6/multiprocessing/pool.py:634(wait)
    866/8    0.004    0.000    0.868    0.108 <frozen importlib._bootstrap>:958(_find_and_load)
    866/8    0.003    0.000    0.867    0.108 <frozen importlib._bootstrap>:931(_find_and_load_unlocked)
    720/8    0.003    0.000    0.865    0.108 <frozen importlib._bootstrap>:641(_load_unlocked)
    596/8    0.002    0.000    0.865    0.108 <frozen importlib._bootstrap_external>:672(exec_module)
   1017/8    0.001    0.000    0.863    0.108 <frozen importlib._bootstrap>:197(_call_with_frames_removed)
   522/51    0.001    0.000    0.765    0.015 {built-in method builtins.__import__}

The profiling info now sorted by time

In [20]: p.sort_stats('time').print_stats(20)
Mon Oct 16 16:43:59 2017    profiling_log.txt

         555404 function calls (543552 primitive calls) in 662.201 seconds

   Ordered by: internal time
   List reduced from 4510 to 20 due to restriction <20>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
       23  661.233   28.749  661.233   28.749 {method 'acquire' of '_thread.lock' objects}
   115/80    0.177    0.002    0.211    0.003 {built-in method _imp.create_dynamic}
      595    0.072    0.000    0.072    0.000 {built-in method marshal.loads}
        1    0.045    0.045    0.045    0.045 {method 'acquire' of '_multiprocessing.SemLock' objects}
    897/1    0.044    0.000  662.202  662.202 {built-in method builtins.exec}
        3    0.042    0.014    0.042    0.014 {method 'read' of '_io.BufferedReader' objects}
2037/1974    0.037    0.000    0.082    0.000 {built-in method builtins.__build_class__}
      286    0.022    0.000    0.061    0.000 /Users/Reddy/anaconda/envs/py36/lib/python3.6/site-packages/scipy/misc/doccer.py:12(docformat)
     2886    0.021    0.000    0.021    0.000 {built-in method posix.stat}
       79    0.016    0.000    0.016    0.000 {built-in method posix.read}
      597    0.013    0.000    0.021    0.000 <frozen importlib._bootstrap_external>:830(get_data)
      276    0.011    0.000    0.013    0.000 /Users/Reddy/anaconda/envs/py36/lib/python3.6/sre_compile.py:250(_optimize_charset)
      108    0.011    0.000    0.038    0.000 /Users/Reddy/anaconda/envs/py36/lib/python3.6/site-packages/scipy/stats/_distn_infrastructure.py:626(_construct_argparser)
     1225    0.011    0.000    0.050    0.000 <frozen importlib._bootstrap_external>:1233(find_spec)
     7179    0.009    0.000    0.009    0.000 {method 'splitlines' of 'str' objects}
       33    0.008    0.000    0.008    0.000 {built-in method posix.waitpid}
      283    0.008    0.000    0.015    0.000 /Users/Reddy/anaconda/envs/py36/lib/python3.6/site-packages/scipy/misc/doccer.py:128(indentcount_lines)
        3    0.008    0.003    0.008    0.003 {method 'poll' of 'select.poll' objects}
     7178    0.008    0.000    0.008    0.000 {method 'expandtabs' of 'str' objects}
      597    0.007    0.000    0.007    0.000 {method 'read' of '_io.FileIO' objects}

More profiling info sorted by percall info: profiling info sorted by percall

Update 2

The elements in the large list datasets I mentioned earlier are not usually as big - they are typically 10-25MB each. But depending on the floating point precision used, number of samples and features, this can easily grow to 500MB-1GB per element also. hence I'd prefer a solution that can scale.

Update 3:

The code inside holdout_trial_compare_datasets uses method GridSearchCV of scikit-learn, which internally uses joblib library if we set n_jobs > 1 (or whenever we even set it). This might lead to some bad interactions between multiprocessing and joblib. So trying another config where I do not set n_jobs at all (which should to default no parallelism within scikit-learn). Will keep you posted.

Upvotes: 12

Views: 1815

Answers (3)

georgexsh
georgexsh

Reputation: 16624

Based on discussion in the comments, I did a mini experiment, compared three versions of implementation:

  • v1: basically as same as your approach, in fact, as partial(f1, *shared_inputs) will unpack proxy_manager.list immediately, Manager.List not involved here, data passed to worker with the internal queue of Pool.
  • v2: v2 made use Manager.List, work function will receive a ListProxy object, it fetches shared data via a internal connection to a server process.
  • v3: child process share data from the parent, take advantage of fork(2) system call.

def f1(*args):
    for e in args[0]: pow(e, 2)

def f2(*args):
    for e in args[0][0]: pow(e, 2)

def f3(n):
    for i in datasets: pow(i, 2)

def v1(np):
    with mp.Manager() as proxy_manager:
        shared_inputs = proxy_manager.list([datasets,])
        pf = partial(f1, *shared_inputs)
        with mp.Pool(processes=np) as pool:
            r = pool.map(pf, range(16))

def v2(np):
    with mp.Manager() as proxy_manager:
        shared_inputs = proxy_manager.list([datasets,])
        pf = partial(f2, shared_inputs)
        with mp.Pool(processes=np) as pool:
            r = pool.map(pf, range(16))

def v3(np):
    with mp.Pool(processes=np) as pool:
        r = pool.map(f3, range(16))

datasets = [2.0 for _ in range(10 * 1000 * 1000)]
for f in (v1, v2, v3):
    print(f.__code__.co_name)
    for np in (2, 4, 8, 16):
        s = time()
        f(np)
        print("%s %.2fs" % (np, time()-s))

results taken on a 16 core E5-2682 VPC, it is obvious that v3 scales better: result

Upvotes: 2

user8241626
user8241626

Reputation: 77

If you do not need a complex shared object, then only use a list of the most simple objects imaginable.

Then tell the workers to acquire the complex data that they can process in their own little world.

Try:

allwork = manager.list([datasetid1, datasetid2 ,...])
theresult = manager.list()

while mywork:
    unitofworkid = allwork.pop()
    theresult = myfunction(unitofworkid)

def myfunction(unitofworkid):
    thework = acquiredataset(unitofworkid)
    result = holdout_trial_compare_datasets(thework, ...)

I hope that this makes sense. It should not take too much time to refactor in this direction. And you should see that {method 'acquire' of '_thread.lock' objects} number drop like a rock when you profile.

Upvotes: 0

user8241626
user8241626

Reputation: 77

{method 'acquire' of '_thread.lock' objects}

Looking at your profiler output I would say that the shared object lock/unlock overhead overwhelms the speed gains of multithreading.

Refactor so that the work is farmed out to workers that do not need to talk to one another as much.

Specifically, if possible, derive one answer per data pile and then act on the accumulated results.

This is why Queues can seem so much faster: they involve a type of work that does not require an object that has to be 'managed' and so locked/unlocked.

Only 'manage' things that absolutely need to be shared between processes. Your managed list contains some very complicated looking objects...

A faster paradigm is:

allwork = manager.list([a, b,c])
theresult = manager.list()

and then

while mywork:
    unitofwork = allwork.pop()
    theresult = myfunction(unitofwork)

Upvotes: 0

Related Questions