Amir
Amir

Reputation: 11096

How to use multiprocessing to parallelize two calls to the same function, with different arguments, in a for loop?

In a for loop, I am calling a function twice but with different argument sets (argSet1, argSet2) that change on each iteration of the for loop. I want to parallelize this operation since one set of the arguments causes the called function to run faster, and the other set of arguments causes a slow run of the function. Note that I do not want to have two for loops for this operation. I also have another requirement: Each of these functions will execute some parallel operations and therefore I do not want to have any of the functions with either argSet1 or argSet2 be running more than once, because of the computational limited resources that I have. Making sure that the function with both argument sets is running will help me utilize the CPU cores as much as possible. Here's how do it normally without parallelization:

def myFunc(arg1, arg2):
    if arg1:
        print ('do something that does not take too long')
    else:
       print ('do something that takes long')

for i in range(10):
    argSet1 = arg1Storage[i]
    argSet1 = arg2Storage[i]
    myFunc(argSet1)
    myFunc(argSet2)

This will definitely not take the advantage of the computational resources that I have. Here's my try to parallelize the operations:

from multiprocessing import Process

def myFunc(arg1, arg2):
    if arg1:
        print ('do something that does not take too long')
    else:
       print ('do something that takes long')

for i in range(10):
    argSet1 = arg1Storage[i]
    argSet1 = arg2Storage[i]
    p1 = Process(target=myFunc, args=argSet1)
    p1.start()
    p2 = Process(target=myFunc, args=argSet2)
    p2.start()

However, this way each function with its respective arguments will be called 10 times and things become extremely slow. Given my limited knowledge of multiprocessing, I tried to improve things a bit more by adding p1.join() and p2.join() to the end of the for loop but this still causes slow down as p1 is done much faster and things wait until p2 is done. I also thought about using multiprocessing.Value to do some communication with the functions but then I have to add a while loop inside the function for each of the function calls which slows down everything again. I wonder if someone can offer a practical solution?

Upvotes: 1

Views: 2779

Answers (3)

kabanus
kabanus

Reputation: 25895

Since I built this answer in patches, scroll down for the best solution to this problem

You need specify to exactly how you want things to run. As far as I can tell, you want two processes to run at most, but also at least. Also, you do not want the heavy call to hold up the fast ones. One simple non-optimal way to run is:

from multiprocessing import Process

def func(counter,somearg):
    j = 0
    for i in range(counter): j+=i
    print(somearg)

def loop(counter,arglist):
    for i in range(10):
        func(counter,arglist[i])

heavy = Process(target=loop,args=[1000000,['heavy'+str(i) for i in range(10)]])
light = Process(target=loop,args=[500000,['light'+str(i) for i in range(10)]])
heavy.start()
light.start()
heavy.join()
light.join()

The output here is (for one example run):

light0
heavy0
light1
light2
heavy1
light3
light4
heavy2
light5
light6
heavy3
light7
light8
heavy4
light9
heavy5
heavy6
heavy7
heavy8
heavy9

You can see the last part is sub-optimal, since you have a sequence of heavy runs - which means there is one process instead of two.

An easy way to optimize this, if you can estimate how much longer is the heavy process running. If it's twice as slow, as here, just run 7 iterations of heavy first, join the light process, and have it run the additional 3.

Another way is to run the heavy process in pairs, so at first you have 3 processes until the fast process ends, and then continues with 2.

The main point is separating the heavy and light calls to another process entirely - so while the fast calls complete one after the other quickly you can work your slow stuff. Once th fast ends, it's up to you how elaborate do you want to continue, but I think for now estimating how to break up the heavy calls is good enough. This is it for my example:

from multiprocessing import Process

def func(counter,somearg):
    j = 0
    for i in range(counter): j+=i
    print(somearg)

def loop(counter,amount,arglist):
    for i in range(amount):
        func(counter,arglist[i])

heavy1 = Process(target=loop,args=[1000000,7,['heavy1'+str(i) for i in range(7)]])
light = Process(target=loop,args=[500000,10,['light'+str(i) for i in range(10)]])
heavy2 = Process(target=loop,args=[1000000,3,['heavy2'+str(i) for i in range(7,10)]])
heavy1.start()
light.start()
light.join()
heavy2.start()
heavy1.join()
heavy2.join()

with output:

light0
heavy10
light1
light2
heavy11
light3
light4
heavy12
light5
light6
heavy13
light7
light8
heavy14
light9
heavy15
heavy27
heavy16
heavy28
heavy29

Much better utilization. You can of course make this more advanced by sharing a queue for the slow process runs, so when the fast are done they can join as workers on the slow queue, but for only two different calls this may be overkill (though not much harder using the queue). The best solution:

from multiprocessing import Queue,Process
import queue

def func(index,counter,somearg):
    j = 0
    for i in range(counter): j+=i
    print("Worker",index,':',somearg)

def worker(index):
    try:
        while True:
            func,args = q.get(block=False)
            func(index,*args)
    except queue.Empty: pass

q = Queue()
for i in range(10):
    q.put((func,(500000,'light'+str(i))))
    q.put((func,(1000000,'heavy'+str(i))))

nworkers = 2
workers = []
for i in range(nworkers):
    workers.append(Process(target=worker,args=(i,)))
    workers[-1].start()
q.close()
for worker in workers:
    worker.join()

This is the best and most scalable solution for what you want. Output:

Worker 0 : light0
Worker 0 : light1
Worker 1 : heavy0
Worker 1 : light2
Worker 0 : heavy1
Worker 0 : light3
Worker 1 : heavy2
Worker 1 : light4
Worker 0 : heavy3
Worker 0 : light5
Worker 1 : heavy4
Worker 1 : light6
Worker 0 : heavy5
Worker 0 : light7
Worker 1 : heavy6
Worker 1 : light8
Worker 0 : heavy7
Worker 0 : light9
Worker 1 : heavy8
Worker 0 : heavy9

Upvotes: 2

Dhia
Dhia

Reputation: 10609

One possible implementation could be as follow:

import concurrent.futures
import math

list_of_args = [arg1, arg2]

def my_func(arg):
    ....
    print ('do something that takes long')

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for arg, result in zip(list_of_args, executor.map(is_prime, list_of_args)):
            print('my_func({0}) => {1}'.format(arg, result))

executor.map is like the built in function, the map method allows multiple calls to a provided function, passing each of the items in an iterable to that function.

Upvotes: 0

dnswlt
dnswlt

Reputation: 3105

You might want to use a multiprocessing.Pool of processes and map your myFunc into it, like so:

from multiprocessing import Pool
import time

def myFunc(arg1, arg2):
    if arg1:
        print ('do something that does not take too long')
        time.sleep(0.01)
    else:
       print ('do something that takes long')
       time.sleep(1)

def wrap(args):
    return myFunc(*args)

if __name__ == "__main__":
    p = Pool()
    argStorage = [(True, False), (False, True)] * 12
    p.map(wrap, argStorage)

I added a wrap function, since the function passed to p.map must accept a single argument. You could just as well adapt myFunc to accept a tuple, if that's possible in your case.

My sample appStorage constists of 24 items, where 12 of them will take 1sec to process, and 12 will be done in 10ms. In total, this script runs in 3-4 seconds (I have 4 cores).

Upvotes: 2

Related Questions