Reputation: 1079
At first let me show you the current setup I have:
import multiprocessing.pool
from contextlib import closing
import os
def big_function(param):
process(another_module.global_variable[param])
def dispatcher():
# sharing read-only global variable taking benefit from Unix
# which follows policy copy-on-update
# https://stackoverflow.com/questions/19366259/
another_module.global_variable = huge_list
# send indices
params = range(len(another_module.global_variable))
with closing(multiprocessing.pool.Pool(processes=os.cpu_count())) as p:
multiprocessing_result = list(p.imap_unordered(big_function, params))
return multiprocessing_result
Here I use shared variable updated before creating process pool, which contains huge data, and that indeed gained me speedup, so it seem to be not pickled now. Also this variable belongs to the scope of an imported module (if it's important).
When I tried to create setup like this:
another_module.global_variable = []
p = multiprocessing.pool.Pool(processes=os.cpu_count())
def dispatcher():
# sharing read-only global variable taking benefit from Unix
# which follows policy copy-on-update
# https://stackoverflow.com/questions/19366259/
another_module_global_variable = huge_list
# send indices
params = range(len(another_module.global_variable))
multiprocessing_result = list(p.imap_unordered(big_function, params))
return multiprocessing_result
p
"remembered" that global shared list was empty and refused to use new data when was called from inside the dispatcher.
Now here is the problem: processing ~600 data objects on 8 cores with the first setup above, my parallel computation runs 8 sec, while single-threaded it works 12 sec.
This is what I think: as long, as multiprocessing pickles data, and I need to re-create processes each time, I need to pickle function big_function()
, so I lose time on that. The situation with data was partially solved using global variable (but I still need to recreate pool on each update of it).
What can I do with instances of big_function()
(which depends on many other functions from other modules, numpy, etc)? Can I create os.cpu_count()
of it's copies once and for all, and somehow feed new data into them and receive results, reusing workers?
Upvotes: 2
Views: 1908
Reputation: 1604
Just to go over 'remembering' issue:
another_module.global_variable = []
p = multiprocessing.pool.Pool(processes=os.cpu_count())
def dispatcher():
another_module_global_variable = huge_list
params = range(len(another_module.global_variable))
multiprocessing_result = list(p.imap_unordered(big_function, params))
return multiprocessing_result
What seems to be the problem is when you are creating Pool
instance.
Why is that?
It's because when you create instance of Pool
, it does set up number of workers (by default equal to a number of CPU cores) and they are all started (forked) at that time. That means workers have a copy of parents global state (and another_module.global_variable
among everything else), and with copy-on-write policy, when you update value of another_module.global_variable
you change it in parent's process. Workers have a reference to the old value. That is why you have a problem with it.
Here are couple of links that can give you more explanation on this: this and this.
Here is a small snippet where you can switch lines where global variable value is changed and where process is started, and check what is printed in child process.
from __future__ import print_function
import multiprocessing as mp
glob = dict()
glob[0] = [1, 2, 3]
def printer(a):
print(globals())
print(a, glob[0])
if __name__ == '__main__':
p = mp.Process(target=printer, args=(1,))
p.start()
glob[0] = 'test'
p.join()
This is the Python2.7 code, but it works on Python3.6 too.
What would be the solution for this issue?
Well, go back to first solution. You update value of imported module's variable and then create pool of processes.
Now the real issue with the lack of speedup.
Here is the interesting part from documentation on how functions are pickled:
Note that functions (built-in and user-defined) are pickled by “fully qualified” name reference, not by value. This means that only the function name is pickled, along with the name of the module the function is defined in. Neither the function’s code, nor any of its function attributes are pickled. Thus the defining module must be importable in the unpickling environment, and the module must contain the named object, otherwise an exception will be raised.
This means that your function pickling should not be a time wasting process, or at least not by itself. What causes lack of speedup is that for ~600 data objects in list that you pass to imap_unordered
call, you pass each one of them to a worker process. Once again, underlying implementation of multiprocessing.Pool
may be the cause of this issue.
If you go deeper into multiprocessing.Pool
implementation, you will see that two Threads
using Queue
are handling communication between parent and all child (worker) processes. Because of this and that all processes constantly require arguments for function and constantly return responses, you end up with very busy parent process. That is why 'a lot' of time is spent doing 'dispatching' work passing data to and from worker processes.
What to do about this?
Try to increase number of data objects that are processes in worker process at any time. In your example, you pass one data object after other and you can be sure that each worker process is processing exactly one data object at any time. Why not increase the number of data objects you pass to worker process? That way you can make each process busier with processing 10, 20 or even more data objects. From what I can see, imap_unordered
has an chunksize
argument. It's set to 1
by default. Try increasing it. Something like this:
import multiprocessing.pool
from contextlib import closing
import os
def big_function(params):
results = []
for p in params:
results.append(process(another_module.global_variable[p]))
return results
def dispatcher():
# sharing read-only global variable taking benefit from Unix
# which follows policy copy-on-update
# https://stackoverflow.com/questions/19366259/
another_module.global_variable = huge_list
# send indices
params = range(len(another_module.global_variable))
with closing(multiprocessing.pool.Pool(processes=os.cpu_count())) as p:
multiprocessing_result = list(p.imap_unordered(big_function, params, chunksize=10))
return multiprocessing_result
Couple of advices:
params
as a list of indexes, that you use to pick particular data object in big_function
. You can create tuples that represent first and last index and pass them to big_function
. This can be a way of increasing chunk of work. This is an alternative approach to the one I proposed above.Pool(processes=os.cpu_count())
, you can omit it. It by default takes number of CPU cores.Sorry for the length of answer or any typo that might have sneaked in.
Upvotes: 2