Zanam
Zanam

Reputation: 4807

Multiprocessing: Order of Execution

I am trying the following code:

from multiprocessing import Pool

def f(x):    
    return x

if __name__ == '__main__':
    p = Pool(5)
    print(p.map(f, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12, 13]))

As I understand the 5 processors get 0, 1, 2, 3, 4 to perform operations on. If processor 1 finishes it's job, does it get 5 right away while rest processors are busy with 1,2,3,4 or does the code finish for all processors so that next batch will all together get 5, 6, 7, 8, 9 and so on. If the later happens how do I implement the above code so that moment a processors goes idle it gets new job assigned to it?

How do I test the implementation?

Upvotes: 4

Views: 3018

Answers (2)

msitt
msitt

Reputation: 1237

Yes, this situation is possible. The input is partitioned into separate tasks first. The issue comes about when the tasks are unequally sized (in terms of processing time) and there are too few of them to fill in the gaps.

From the documentation:

map(func, iterable[, chunksize])

This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks. The (approximate) size of these chunks can be specified by setting chunksize to a positive integer.

Example

To illustrate this behavior I changed f(x) so it takes x seconds to complete.

from multiprocessing import Pool
import time
import threading

def f(x):
    print('x: ' + str(x) + '\tThread ID: ' + str(threading.get_ident()))
    time.sleep(x)

if __name__ == '__main__':
    chunksize = 3
    with Pool(2) as p:
        p.map(f, [10, 1, 1, 1, 1, 1], chunksize)

The input array [10, 1, 1, 1, 1, 1] is partitioned into len(arr) / chunksize = 2 groups:

[10, 1, 1]  # For thread 1, takes 12 seconds to finish
[ 1, 1, 1]  # For thread 2, takes 3 seconds to finish

So, thread 2 will finish after 3 seconds, while thread 1 will continue working for another 9 seconds.

Example output:

x: 10   Thread ID: 8556
x: 1    Thread ID: 59180
x: 1    Thread ID: 59180
x: 1    Thread ID: 59180
x: 1    Thread ID: 8556
x: 1    Thread ID: 8556

If you find yourself in this situation, then you can force a smaller chunksize. A value of 1 ensures as balanced a workload as possible, at the cost of higher overhead.

Upvotes: 1

Jon Malachowski
Jon Malachowski

Reputation: 256

The Threadpool spawns a new thread immediately (adding to your example). Notice how thread 4 took long enough that that the 12th task is able to start.

PS I just noticed you forgot 10.

from multiprocessing import Pool
import time
import random

def f(x):
    print "Enter %s" % x
    time.sleep( random.randrange(1,100,1)/10.0 )
    print "Exit %s" % x
    return x

if __name__ == '__main__':
    p = Pool(5)
    print(p.map(f, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12, 13]))

Enter 0
Enter 1
Enter 2
Enter 3
Enter 4
Exit 0
Enter 5
Exit 3
Enter 6
Exit 2
Enter 7
Exit 5
Enter 8
Exit 1
Enter 9
Exit 6
Enter 11
Exit 11
Enter 12
Exit 4
Enter 13
Exit 7
Exit 12
Exit 9
Exit 8
Exit 13
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12, 13]

Upvotes: 2

Related Questions