Reputation: 1830
I use multiprocessing in my python code to run asynchronously a function:
import multiprocessing
po = multiprocessing.Pool()
for elements in a_list:
results.append(po.apply_async(my_module.my_function, (some_arguments, elements, a_big_argument)))
po.close()
po.join()
for r in results:
a_new_list.add(r.get())
a_big_argument
is a dictionary. I give it as an argument. It is big in the sense that it is between 10 and 100 Mo. It seems like it has a big impact on the performance of my code.
I'm probably doing something stupid and not efficient here, since the performance of my code really went down with this new argument.
What is the best way to deal with a big dictionary? I don't want to load it every time in my function. Would it be a solution to create a database instead and to connect to it?
Here is a code you can run:
'''
Created on Mar 11, 2013
@author: Antonin
'''
import multiprocessing
import random
# generate an artificially big dictionary
def generateBigDict():
myBigDict = {}
for key in range (0,1000000):
myBigDict[key] = 1
return myBigDict
def myMainFunction():
# load the dictionary
myBigDict = generateBigDict()
# create a list on which we will asynchronously run the subfunction
myList = []
for list_element in range(0,20):
myList.append(random.randrange(0,1000000))
# an empty set to receive results
set_of_results = set()
# there is a for loop here on one of the arguments
for loop_element in range(0,150):
results = []
# asynchronoulsy run the subfunction
po = multiprocessing.Pool()
for list_element in myList:
results.append(po.apply_async(mySubFunction, (loop_element, list_element, myBigDict)))
po.close()
po.join()
for r in results:
set_of_results.add(r.get())
for element in set_of_results:
print element
def mySubFunction(loop_element, list_element, myBigDict):
import math
intermediaryResult = myBigDict[list_element]
finalResult = intermediaryResult + loop_element
return math.log(finalResult)
if __name__ == '__main__':
myMainFunction()
Upvotes: 1
Views: 4838
Reputation: 1830
I used multiprocessing.Manager
to do it.
import multiprocessing
manager = multiprocessing.Manager()
a_shared_big_dictionary = manager.dict(a_big_dictionary)
po = multiprocessing.Pool()
for elements in a_list:
results.append(po.apply_async(my_module.my_function, (some_arguments, elements, a_shared_big_dictionary)))
po.close()
po.join()
for r in results:
a_new_list.add(r.get())
Now, it is much faster.
Upvotes: 3
Reputation: 31631
Any argument you pass to one of the Pool
methods (e.g. apply_async
) needs to be pickled, sent to the worker processes via pipes, and unpickled in the worker processes. This pickle/pass/unpickle process can be expensive in time and memory, especially if you have a large object graph since each worker process must create a separate copy.
There are many different ways to avoid these pickles depending on the exact shape of your problem. Since your workers are only reading your dictionary and not writing to it, you can safely reference it directly from your function (i.e. not pass it to apply_async
) and rely on fork()
to avoid creating a copy in worker processes.
Even better, you can change mySubFunction()
so that it accepts intermediaryResult
as an argument instead of looking it up using list_element
and myBigDict
. (You may be able to do this with a closure, but I am not 100% sure that pickle won't try to copy the closed-over myBigDict
object as well.)
Alternatively, you can put myBigDict
in some place where all processes can share it safely, e.g. one of the simple persistance methods, such as dbm or sqlite, and have workers access it from there.
Unfortunately all these solutions require that you change the shape of your task functions. Avoiding this "shape-changing" is one reason why people like "real" cpu threads.
Upvotes: 1
Reputation: 1801
See the answer to Shared-memory objects in python multiprocessing question.
It suggests either using multiprocessing.Array to pass arrays to subprocesses or using fork().
Upvotes: 1