Ravi Saroch
Ravi Saroch

Reputation: 960

Error in parallel processing of python program

def bagging_and_trees_growth(samples, network, tree_num):
    trees = []
    
    for i in range(tree_num):
        bootstrap_samples = bagging(samples)
        a_tree = tree_growth(network, bootstrap_samples)
        trees.append(a_tree)
        
    return trees
    
def agiled_random_forest(samples, network, size, processes=39):
   
    rforest = []
        
    #job_server = pp.Server(processes=processes)
    threadPool = ThreadPool(processes=processes)
     
    depfun = (find_best_split, stopping_condition, purity_gain, Gini_index, find_neighbors, tree_growth, bagging)
    dep_modules = ('networkx', 'numpy', 'math', 'random', 'sys', 'pNGF')
    
    
    tree_num_of_each_task = int(size/processes)
    
    #jobs = [pp_server.submit(bagging_and_trees_growth, (samples, network, tree_num_of_each_task), depfun, 'dep_modules) for x in range(processes)]
   
    jobs = [threadPool.apply_async(bagging_and_trees_growth, (samples, network, tree_num_of_each_task), depfun, dep_modules) for x in range(processes)]
    
    
    for job in jobs:
        rforest += job.get()
    
    threadPool.destroy()
    return rforest

It show the error of mapping and tuple

TypeError: bagging_and_trees_growth() argument after ** must be a mapping, not tuple

How to resolve this error as pp moules in not working in python3?

Upvotes: 1

Views: 165

Answers (1)

AKX
AKX

Reputation: 169298

You might be looking for something like this.

The idea here is that bagging_and_trees_growth no longer has an inner job loop; we rely on the thread pool (or, preferably, GIL considered, a process pool, but that's up to you) to deal with dishing out work efficiently.

Since it obviously makes no difference here in which order the jobs are executed, imap_unordered will be the fastest high-level construct. One could also use apply_async, but it's more work.

import itertools
import multiprocessing.pool


def bagging_and_trees_growth(job):
    samples, network = job  # unpack the job tuple
    bootstrap_samples = bagging(samples)
    a_tree = tree_growth(network, bootstrap_samples)
    return a_tree


def agiled_random_forest(samples, network, size, processes=39):
    rforest = []
    with multiprocessing.pool.ThreadPool(processes=processes) as pool:
        # to use imap_unordered (the fastest high-level pool operation),
        # we need to pack each job into an object; since all we need here is 2 parameters, let's use a tuple.
        # set up a generator to generate the same job size times
        job_gen = itertools.repeat((samples, network), size)
        # do the work in parallel
        for result in pool.imap_unordered(bagging_and_trees_growth, job_gen):
            # could do something else with the result here;
            # in fact this could all just be `rforest = list(pool.imap...)`
            # in the simple case
            rforest.append(result)
    return rforest

Upvotes: 1

Related Questions