Reputation: 57
I'm new to python multiprocessing and experimenting a little before the serious use. Right now I'm using pool.map() which seems to work great. But I would like to finish the pool jobs before continuing with the rest of my program:
import time
import multiprocessing as mp
res1 = []
def my_func(x):
print(mp.current_process())
res1.append(x**x)
def main():
print(mp.cpu_count())
pool = mp.Pool(mp.cpu_count())
tic = time.time()
pool.map(my_func, [4444444,2222222,3333333,5555555,3333333,2222222,1111111,2222222])
toc = time.time()
t = toc-tic
print(t)
if __name__ == "__main__":
main()
print('Hi')
Right now the "hi" is printed in between my processes, but I would like to have the calculation processes finish first and at the end it should print "hi".
I tried out pool.close() and pool.join() but inside of my_func() it seems to change nothing and outside, say in main(), it does not know my pool object (of course since it is declared inside of main())
I know this is just a testing program, but I need this concept for a far larger project in my thesis. So i appreciate every help I can get. Thanks a lot in advance!
Upvotes: 0
Views: 4099
Reputation: 44138
A few things:
First, x**x
with the values of x
that you are passing is a very large number and will take a quite a while to calculate. On my desktop with 8 logical processors (cpu_count()
returns 8), the map
function took 99 seconds to complete -- but it does complete.
Second, the global variable res1
to which each process is appending to is unique to each process. That is, each process is running in its own address space (this is a property of multiprocessing), and therefore has its own copy of res1
and that is why the main process's res1
will be empty upon return from the call to map
. Instead, your worker function, i.e. my_func
, should return its result and then the return value from map
will be a list of all the return values.
Third, and this is a property of how processes are created under Windows, any statement at global scope that is not within the if __name__ = "__main__":
block will be executed by every newly created process upon its initialization and that is why you see Hi
printed cpu_count()
times immediately. You should remove the call to print('Hi')
at global scope and place it in my_func
(see the last code example).
Because life is short, I have modified my_func
just to return the square of its argument:
import time
import multiprocessing as mp
def my_func(x):
print(mp.current_process())
return x * x
def main():
print(mp.cpu_count())
pool = mp.Pool(mp.cpu_count())
tic = time.time()
results = pool.map(my_func, [4444444,2222222,3333333,5555555,3333333,2222222,1111111,2222222])
print('Hi')
toc = time.time()
t = toc-tic
print(t)
print(results)
if __name__ == "__main__":
main()
Prints:
8
<SpawnProcess name='SpawnPoolWorker-3' parent=3232 started daemon>
<SpawnProcess name='SpawnPoolWorker-1' parent=3232 started daemon>
<SpawnProcess name='SpawnPoolWorker-3' parent=3232 started daemon>
<SpawnProcess name='SpawnPoolWorker-3' parent=3232 started daemon>
<SpawnProcess name='SpawnPoolWorker-1' parent=3232 started daemon>
<SpawnProcess name='SpawnPoolWorker-3' parent=3232 started daemon>
<SpawnProcess name='SpawnPoolWorker-3' parent=3232 started daemon>
<SpawnProcess name='SpawnPoolWorker-1' parent=3232 started daemon>
Hi
0.17300081253051758
[19753082469136, 4938270617284, 11111108888889, 30864191358025, 11111108888889, 4938270617284, 1234567654321, 4938270617284]
You will notice that although the pool size was 8, 7 of the results were processed by a single process in the pool. The reason for this is as follows. The 8 "tasks" to be processed, represented by the 8 numbers in the iterable passed to the map
method are placed in an input queue of tasks to be processed in "chunks" of a certain size. If you do not specify the chunksize argument, it defaults to None
and the map
method will compute a suitable value based on the size of the iterable and the pool size. In this case a chunksize value of 1 will be used. Idle processes will grab from the input queue the next chunk of tasks and execute the chunk. In this case one of the processes in the pool grabbed a chunk and executed it so quickly that it was able to go back and grab the next chunk and execute it before any other process in the pool was dispatched on another processor. In fact, it was able to do this 6 more times.
By making my_func
fun a bit longer by putting in a call to time.sleep
and ensuring that a chunksize value of 1 is used, we ensure that we give each processor a chance to grab a task:
import time
import multiprocessing as mp
import time
def my_func(x):
time.sleep(.1)
print(mp.current_process())
return x * x
def main():
print(mp.cpu_count())
pool = mp.Pool(mp.cpu_count())
tic = time.time()
results = pool.map(my_func, [4444444,2222222,3333333,5555555,3333333,2222222,1111111,2222222], chunksize=1)
toc = time.time()
t = toc-tic
print(t)
print(results)
if __name__ == "__main__":
main()
print('Hi')
Prints:
8
Hi
Hi
Hi
Hi
Hi
Hi
Hi
Hi
<SpawnProcess name='SpawnPoolWorker-1' parent=3116 started daemon>
<SpawnProcess name='SpawnPoolWorker-4' parent=3116 started daemon>
<SpawnProcess name='SpawnPoolWorker-3' parent=3116 started daemon>
<SpawnProcess name='SpawnPoolWorker-2' parent=3116 started daemon>
<SpawnProcess name='SpawnPoolWorker-6' parent=3116 started daemon>
<SpawnProcess name='SpawnPoolWorker-7' parent=3116 started daemon>
<SpawnProcess name='SpawnPoolWorker-5' parent=3116 started daemon>
<SpawnProcess name='SpawnPoolWorker-8' parent=3116 started daemon>
0.28499770164489746
[19753082469136, 4938270617284, 11111108888889, 30864191358025, 11111108888889, 4938270617284, 1234567654321, 4938270617284]
Hi
Simalry, by specifying chunksize=8, we are guaranteeing that all 8 tasks will be processed by one process while the other 7 sit idle:
import time
import multiprocessing as mp
import time
def my_func(x):
time.sleep(.1)
print(mp.current_process())
print('Hi')
return x * x
def main():
print(mp.cpu_count())
pool = mp.Pool(mp.cpu_count())
tic = time.time()
results = pool.map(my_func, [4444444,2222222,3333333,5555555,3333333,2222222,1111111,2222222], chunksize=8)
toc = time.time()
t = toc-tic
print(t)
print(results)
if __name__ == "__main__":
main()
Prints:
8
<SpawnProcess name='SpawnPoolWorker-4' parent=22284 started daemon>
Hi
<SpawnProcess name='SpawnPoolWorker-4' parent=22284 started daemon>
Hi
<SpawnProcess name='SpawnPoolWorker-4' parent=22284 started daemon>
Hi
<SpawnProcess name='SpawnPoolWorker-4' parent=22284 started daemon>
Hi
<SpawnProcess name='SpawnPoolWorker-4' parent=22284 started daemon>
Hi
<SpawnProcess name='SpawnPoolWorker-4' parent=22284 started daemon>
Hi
<SpawnProcess name='SpawnPoolWorker-4' parent=22284 started daemon>
Hi
<SpawnProcess name='SpawnPoolWorker-4' parent=22284 started daemon>
Hi
0.9750022888183594
[19753082469136, 4938270617284, 11111108888889, 30864191358025, 11111108888889, 4938270617284, 1234567654321, 4938270617284]
Needless to say, this is a poor chunksize value to be used for this iterable/pool size combination.
Upvotes: 1