Mashhood Ahmad
Mashhood Ahmad

Reputation: 322

Concurrent Futures: When and how to implement?

from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
import numpy as np
import time

#creating iterable
testDict = {}
for i in range(1000):
    testDict[i] = np.random.randint(1,10)
    

#default method
stime = time.time()    
newdict = []

for k, v in testDict.items():
    for i in range(1000):
        v = np.tanh(v)
    newdict.append(v)
    
etime = time.time()
print(etime - stime) 
#output: 1.1139910221099854  



#multi processing
stime = time.time()
testresult = []

def f(item):
    x = item[1]
    for i in range(1000):
        x = np.tanh(x)
    return x

def main(testDict):
    with ProcessPoolExecutor(max_workers = 8) as executor:
        futures = [executor.submit(f, item) for item in testDict.items()]
        for future in as_completed(futures):
            testresult.append(future.result())
            
if __name__ == '__main__':
    main(testDict)    

etime = time.time()
print(etime - stime)
#output: 3.4509658813476562

Learning multiprocessing and testing stuff. Ran a test to check if I have implemented this correctly. Looking at the output time taken, concurrent method is 3 times slower. So what's wrong?

My objective is to parallelize a script which mostly operates on a dictionary of around 500 items. Each loop, values of those 500 items are processed and updated. This loops for let's say 5000 generations. None of the k,v pairs interact with other k,v pairs. [Its a genetic algorithm].

I am also looking at guidance on how to parallelize the above described objective. If I use the correct concurrent futures method on each of my function in my genetic algorithm code, where each function takes an input of a dictionary and outputs a new dictionary, will it be useful? Any guides/resources/help is appreciated.

Edit: If I run this example: https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor-example, it takes 3 times more to solve than a default for loop check.

Upvotes: 0

Views: 627

Answers (1)

Lie Ryan
Lie Ryan

Reputation: 64847

There are a couple basic problems here, you're using numpy but you're not vectorizing your calculations. You'll not benefit from numpy's speed benefit with the way you write your code here, and might as well just use the standard library math module, which is faster than numpy for this style of code:

# 0.089sec
import math
for k, v in testDict.items():
    for i in range(1000):
        v = math.tanh(v)
    newdict.append(v)

Once you vectorise the operation, only then you see the benefit of numpy:

# 0.016sec
for k, v in testDict.items():
    arr = no.full(1000, v)
    arr2 = np.tanh(arr)
    newdict.append(arr2[-1])

For comparison, your original single threaded code runs in 1.171sec on my test machine. As you can see here, when it's not used properly, NumPy can be a couple orders of magnitude slower than even pure Python.

Now on to why you're seeing what you're seeing.

To be honest, I can't replicate your timing results. Your original multiprocessing code runs in 0.299sec for me macOS on Python 3.6), which is faster than the single process code. But if I have to take a guess, you're probably using Windows? In some platforms like Windows, creating a child process and setting up an environment to run multiprocessing task is very expensive, so using multiprocessing for a task that lasts less than a few seconds is of dubious benefit. If your are interested in why, read here.

Also, in platforms that lacks a usable fork() like MacOS after Python 3.8 or Windows, when you use multiprocessing, the child process has to reimport the module, so if you put both code in the same file, it has to run your single threaded code in the child processes before it can run the multiprocessing code. You'll likely want to put your test code in a function and protect the top level code with if __name__ == "__main__" block. On Mac with Python 3.8 or higher, you can also revert to using fork method by calling multiprocessing.set_start_method("fork") if you're not calling into Mac's non-fork-safe framework libraries.

With that out of the way, on to your title question.

When you use multiprocessing, you need to copy data to the child process and back to the main process to retrieve the result and there's a cost to spawning child processes. To benefit from multiprocessing, you need to design your workload so that this part of the cost is negligible.

If your data comes from external source, try loading the data in the child processes, rather than having the main process load the data then transfer it to the child process, have the main process tell the child how to fetch its slice of data. Here you're generating the testDict in the main process, so if you can, parallelize that and move them to the children instead.

Also, since you're using numpy, if you vectorise your operations properly, numpy will release the GIL while doing vectorised operations, so you may be able to just use multithreading instead. Since numpy doesn't hold GIL during vector operation, you can take advantage of multiple threads in a single Python process, and you don't need to fork or copy data over to child processes, as threads share memory.

Upvotes: 1

Related Questions