maciek
maciek

Reputation: 2107

Creating n processes for iterative task in python

I have a complexed problem with python multiprocessing module. I have build a script that in one place has to call a multiargument function (call_function) for each element in a specyfic list. My idea is to define an integer 'N' and divide this problem for single sub processes.

li=[a,b,c,d,e] #elements are int's
for element in li:
    call_function(element,string1,string2,int1)

call_summary_function()

Summary function will analyze results obtained by all iterations of the loop. Now, I want each iteration to be carried out by single sub process, but there cannot be more than N subprocesses altogether. If so, main process should wait until 1 of subprocesses end and then perform another iteration. Also, call_sumary_function need to be called after all the sub processes finish.

I have tried my best with multiprocessing module, Locks and global variables to keep the actual number of subprocesses running (to compare to N) but every time i get error.

//--------------EDIT-------------//

Firstly, the main process code:

MAX_PROCESSES=3
lock=multiprocessing.Lock()
processes=0
k=0
while k < len(k_list):

    if processes<=MAX_PROCESSES: # running processes <= 'N' set by me

        p = multiprocessing.Process(target=single_analysis, args=(k_list[k],main_folder,training_testing,subsets,positive_name,ratio_list,lock,processes))
        p.start()
        k+=1

    else: time.sleep(1)


while processes>0: time.sleep(1)

Now: the function that is called by multiprocessing:

def single_analysis(k,main_folder,training_testing,subsets,positive_name,ratio_list,lock,processes):

lock.acquire()
processes+=1
lock.release()

#stuff to do

lock.acquire()
processes-=1
lock.release()

I get the Error that int value (processes variable) is always equal to 0, since single_analysis() function seems to create new, local variable processes. When I change processes to global and import it in single_analysis() with global keyword and type print processes in within the function I get len(li) times 1...

Upvotes: 1

Views: 1519

Answers (1)

dano
dano

Reputation: 94891

What you're describing is pefectly suited for multiprocessing.Pool - specifically its map method:

import multiprocessing
from functools import partial

def call_function(string1, string2, int1, element):
    # Do stuff here

if __name__ == "__main__":
    li=[a,b,c,d,e]
    p = multiprocessing.Pool(N)  # The pool will contain N worker processes.

    # Use partial so that we can pass a method that takes more than one argument to map.
    func = partial(call_function, string1,string2,int1)

    results = p.map(func, li)
    call_summary_function(results)

p.map will call call_function(string1, string2, int1, element), for each element in the li list. results will be a list containing the value returned by each call to call_function. You can pass that list to call_summary_function to process the results.

Upvotes: 1

Related Questions