Reputation: 4807
I am trying the following code:
from multiprocessing import Pool
def f(x):
return x
if __name__ == '__main__':
p = Pool(5)
print(p.map(f, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12, 13]))
As I understand the 5 processors get 0, 1, 2, 3, 4
to perform operations on. If processor 1 finishes it's job, does it get 5
right away while rest processors are busy with 1,2,3,4
or does the code finish for all processors so that next batch will all together get 5, 6, 7, 8, 9
and so on. If the later happens how do I implement the above code so that moment a processors goes idle it gets new job assigned to it?
How do I test the implementation?
Upvotes: 4
Views: 3018
Reputation: 1237
Yes, this situation is possible. The input is partitioned into separate tasks first. The issue comes about when the tasks are unequally sized (in terms of processing time) and there are too few of them to fill in the gaps.
From the documentation:
map(func, iterable[, chunksize])
This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks. The (approximate) size of these chunks can be specified by setting chunksize to a positive integer.
To illustrate this behavior I changed f(x)
so it takes x
seconds to complete.
from multiprocessing import Pool
import time
import threading
def f(x):
print('x: ' + str(x) + '\tThread ID: ' + str(threading.get_ident()))
time.sleep(x)
if __name__ == '__main__':
chunksize = 3
with Pool(2) as p:
p.map(f, [10, 1, 1, 1, 1, 1], chunksize)
The input array [10, 1, 1, 1, 1, 1]
is partitioned into len(arr) / chunksize = 2
groups:
[10, 1, 1] # For thread 1, takes 12 seconds to finish
[ 1, 1, 1] # For thread 2, takes 3 seconds to finish
So, thread 2 will finish after 3 seconds, while thread 1 will continue working for another 9 seconds.
Example output:
x: 10 Thread ID: 8556
x: 1 Thread ID: 59180
x: 1 Thread ID: 59180
x: 1 Thread ID: 59180
x: 1 Thread ID: 8556
x: 1 Thread ID: 8556
If you find yourself in this situation, then you can force a smaller chunksize
. A value of 1 ensures as balanced a workload as possible, at the cost of higher overhead.
Upvotes: 1
Reputation: 256
The Threadpool spawns a new thread immediately (adding to your example). Notice how thread 4 took long enough that that the 12th task is able to start.
PS I just noticed you forgot 10.
from multiprocessing import Pool
import time
import random
def f(x):
print "Enter %s" % x
time.sleep( random.randrange(1,100,1)/10.0 )
print "Exit %s" % x
return x
if __name__ == '__main__':
p = Pool(5)
print(p.map(f, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12, 13]))
Enter 0
Enter 1
Enter 2
Enter 3
Enter 4
Exit 0
Enter 5
Exit 3
Enter 6
Exit 2
Enter 7
Exit 5
Enter 8
Exit 1
Enter 9
Exit 6
Enter 11
Exit 11
Enter 12
Exit 4
Enter 13
Exit 7
Exit 12
Exit 9
Exit 8
Exit 13
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12, 13]
Upvotes: 2