mafia
mafia

Reputation: 21

Call multiprocessing program inside for loop in Python

I was working on a project, I need to run a multiprocessing code inside for a loop (i have list of y). I had this script for a single execution. but getting no idea how to use inside a for loop where i have list of y.

from multiprocessing import Process, Manager
import time
import os
start_time = time.time()
cpu = int(os.cpu_count())

y = 1

def dothing(L, i,y):  # the managed list `L` passed explicitly.
    for j in i:
        L.append(j*y)

if __name__ == "__main__":
    with Manager() as manager:
        L = manager.list()  # <-- can be shared between processes.
        x = list(range(110))
        processes = []
        size = len(x)//cpu
        for i in range(cpu):
            if i < cpu-1:
                p = Process(target=dothing, args=(L,x[i*size:size*(i+1)],y))  # Passing the list
            else:
                p = Process(target=dothing, args=(L,x[i*size:],y))
                
            p.start()
            processes.append(p)
        for p in processes:
            p.join()
        print(L)
        print("--- %s seconds ---" % (time.time() - start_time))

For y=1 i will get a list L, which further I will use it for sliding correlation with dataframe column containing 20L rows, again for y=2 I have to do the same sliding correlation and same for y=3 and so on till y=2000. so i want to make this multiprocessing work like a normal function so that I can pass multiprocessing_func(y, someother values..) and I will get a list L for my task

Upvotes: 1

Views: 202

Answers (1)

Booboo
Booboo

Reputation: 44283

There is fundamental problem with your approach assuming I understands what you are trying to do. The problem is that you have no control over how the new subprocesses that you have created get dispatched to run and therefore the order in which they append their results to the managed list you are passing to them. So let's assume that we had the case of y = 2 and an input list of 2 elements [0, 1]. The resulting list could end up being [2, 0], which is probably not what you want.

The easiest fix to this problem is to pass instead a managed dictionary and an index to your worker function dothing. The index will go from 0 to N-1 where N is the number of splits of your input. The worker function will store its result as index N in the passed managed dictionary. The main process will then assemble a final list by iterating the values of the dictionary in key order.

I have also renamed some variables for (my) clarity and to be more aligned with the PEP8 specification. In addition, I have relocated some declaration of variables to where they should more properly be located.

from multiprocessing import Process, Manager
import time
import os

def dothing(d, i, x, y):  # the managed dict `d` passed explicitly.
    l = [j * y for j in x]
    d[i] = l # set index i

if __name__ == "__main__":
    start_time = time.time()
    cpu_count = os.cpu_count()
    y = 2 # Let's make this more interesting than just y = 1
    with Manager() as manager:
        d = manager.dict()  # <-- can be shared between processes.
        x = list(range(110))
        processes = []
        size = len(x) // cpu_count
        for i in range(cpu_count):
            if i < cpu_count-1:
                p = Process(target=dothing, args=(d, i, x[i*size:size*(i+1)], y))  # Passing the dict
            else:
                p = Process(target=dothing, args=(d , i, x[i*size:], y))
            p.start()
            processes.append(p)
        for p in processes:
            p.join()
        l = []
        for i in range(cpu_count):
            l.extend(d[i])
        print(l)
        print("--- %s seconds ---" % (time.time() - start_time))

Prints:

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98, 100, 102, 104, 106, 108, 110, 112, 114, 116, 118, 120, 122, 124, 126, 128, 130, 132, 134, 136, 138, 140, 142, 144, 146, 148, 150, 152, 154, 156, 158, 160, 162, 164, 166, 168, 170, 172, 174, 176, 178, 180, 182, 184, 186, 188, 190, 192, 194, 196, 198, 200, 202, 204, 206, 208, 210, 212, 214, 216, 218]
--- 0.3932952880859375 seconds ---

If I might also make some further suggestions:

  1. You have x = list(range(110)), which you are splitting. If you have instead x = range(110), i.e. keep it as a range, then when you do the splitting, you will be passing to your worker function range instances rather than lists, which will be more efficient.
  2. Although it's not likely in your case, if you had more CPU cores than elements in your x iterable, the way you are currently splitting x would result in creating 0-length splits and therefore you would be creating extra processes that would be doing nothing. In the code below, I have created a split function that will split an iterable into min(len(iterable), N) splits. It also simplifies the logic a bit:

The revised code:

from multiprocessing import Process, Manager
import time
import os

def dothing(d, i, split, y):  # the managed dict `d` passed explicitly.
    l = [j * y for j in split]
    d[i] = l # set index i

def split(iterable, n):  # function to split iterable in n even parts
    if type(iterable) is range and iterable.step != 1:
        # algorithm doesn't work with steps other than 1:
        iterable = list(iterable)
    l = len(iterable)
    n = min(l, n)
    k, m = divmod(l, n)
    return list(iterable[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n))

if __name__ == "__main__":
    start_time = time.time()
    cpu_count = os.cpu_count()
    y = 2 # Let's make this more interesting than just y = 1
    with Manager() as manager:
        d = manager.dict()  # <-- can be shared between processes.
        x = range(110)
        processes = []
        splits = split(x, cpu_count) # each split will be a range
        # len(splits) can be less than cpu_count
        for i, split in enumerate(splits):
            p = Process(target=dothing, args=(d, i, split, y))  # Passing the dict
            p.start()
            processes.append(p)
        for p in processes:
            p.join()
        l = []
        for i in range(len(splits)):
            l.extend(d[i])
        print(l)
        print("--- %s seconds ---" % (time.time() - start_time))

Update

If you want to run this for y - 1 .. 2000 inclusive, I would create my tasks by splitting up the y range so you have fewer processes doing more work each. This is because there is overhead in creating processes and in passing arguments form the main process to the worker function and passing results back that you do not have with serial processing. So for there to be a performance improvement gained by running multiple tasks in parallel, those tasks must be sufficiently CPU-intensive so that the performance gained by running them in parallel more than compensates for the additional overhead.

The following code also compares the running time with performing the same calculation using straightforward, simpler serial coding:

from multiprocessing import Process, Manager
import time
import os

def dothing(d, i, y_range):  # the managed dict `d` passed explicitly.
    l = [j * y for y in y_range for j in range(110)]
    d[i] = l # set index i

def dothing_serial(y_range):
    l = [j * y for y in y_range for j in range(110)]
    return l

def split(iterable, n):  # function to split iterable in n even parts
    if type(iterable) is range and iterable.step != 1:
        # algorithm doesn't work with steps other than 1:
        iterable = list(iterable)
    l = len(iterable)
    n = min(l, n)
    k, m = divmod(l, n)
    return list(iterable[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n))

if __name__ == "__main__":
    start_time = time.time()
    cpu_count = os.cpu_count()
    with Manager() as manager:
        d = manager.dict()  # <-- can be shared between processes.
        y_values = range(1, 2001) # y = 1 ... 2000
        processes = []
        y_ranges = split(y_values, cpu_count) # each split will be a range
        # len(splits) can be less than cpu_count
        for i, y_range, in enumerate(y_ranges):
            p = Process(target=dothing, args=(d, i, y_range))  # Passing the dict
            p.start()
            processes.append(p)
        for p in processes:
            p.join()
        l = []
        for i in range(len(y_ranges)):
            l.extend(d[i])
        print("--- %s seconds ---" % (time.time() - start_time))
        assert len(l) == 2000 * 110
        # The 110 entries corresponding to y=1 start at offset 0:
        print(l[0:110])
        # The 110 entries corresponding to y=2000 are the last 110 entries:
        print(l[-110:])
    # Compute serial time:
    print('\nSerial run:')
    start_time = time.time()
    l = dothing_serial(y_values)
    assert len(l) == 2000 * 110
    print("--- %s seconds ---" % (time.time() - start_time))

Prints:

--- 0.4081614017486572 seconds ---
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109]
[0, 2000, 4000, 6000, 8000, 10000, 12000, 14000, 16000, 18000, 20000, 22000, 24000, 26000, 28000, 30000, 32000, 34000, 36000, 38000, 40000, 42000, 44000, 46000, 48000, 50000, 52000, 54000, 56000, 58000, 60000, 62000, 64000, 66000, 68000, 70000, 72000, 74000, 76000, 78000, 80000, 82000, 84000, 86000, 88000, 90000, 92000, 94000, 96000, 98000, 100000, 102000, 104000, 106000, 108000, 110000, 112000, 114000, 116000, 118000, 120000, 122000, 124000, 126000, 128000, 130000, 132000, 134000, 136000, 138000, 140000, 142000, 144000, 146000, 148000, 150000, 152000, 154000, 156000, 158000, 160000, 162000, 164000, 166000, 168000, 170000, 172000, 174000, 176000, 178000, 180000, 182000, 184000, 186000, 188000, 190000, 192000, 194000, 196000, 198000, 200000, 202000, 204000, 206000, 208000, 210000, 212000, 214000, 216000, 218000]

Serial run:
--- 0.015000581741333008 seconds ---

As can be seen, straightforward serial processing runs approximately 27 times faster strongly suggesting that this problem is not a good candidate for multiprocessing.

If we make the worker function, more CPU-intensive by reducing the number of processes to 2, so there are fewer processes to do he work but each process must do more, then the elapsed time becomes: .288 seconds instead of .408 seconds for a modest improvement. And if we only use one process the time doesn't change significantly. If there were no overhead due to multiprocessing you would expect this timing to be similar to the serial timing, The difference must be attributed to the multiprocess overhead previously described, which is clearly not trivial.

By using a multiprocessing pool of size 2 instead of individual Process instances, we can reduce the multiprocessing time further. In this case the values are returned via an hidden multiprocessing.Queue in the correct order, which is more efficient than using a managed dictionary:

from multiprocessing import Pool, Manager
import time
import os


def dothing(y_range):
    l = [j * y for y in y_range for j in range(110)]
    return l

def split(iterable, n):  # function to split iterable in n even parts
    if type(iterable) is range and iterable.step != 1:
        # algorithm doesn't work with steps other than 1:
        iterable = list(iterable)
    l = len(iterable)
    n = min(l, n)
    k, m = divmod(l, n)
    return list(iterable[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n))

def compute_chunksize(iterable_size, pool_size):
    chunksize, remainder = divmod(iterable_size, (4 * pool_size))
    if remainder:
        chunksize += 1
    return chunksize

if __name__ == "__main__":
    start_time = time.time()
    cpu_count = 2
    y_values = range(1, 2001) # y = 1 ... 2000
    y_ranges = split(y_values, cpu_count) # each split will be a range
    with Pool(cpu_count) as pool:
        # len(splits) can be less than cpu_count
        l = []
        for result in pool.imap(dothing, split(y_values, cpu_count), chunksize=compute_chunksize(len(y_ranges), cpu_count)):
            l.extend(result)
        print("--- %s seconds ---" % (time.time() - start_time))
        assert len(l) == 2000 * 110
        # The 110 entries corresponding to y=1 start at offset 0:
        print(l[0:110])
        # The 110 entries corresponding to y=2000 are the last 110 entries:
        print(l[-110:])
    # Compute serial time:
    print('\nSerial run:')
    start_time = time.time()
    l = dothing(y_values)
    assert len(l) == 2000 * 110
    print("--- %s seconds ---" % (time.time() - start_time))

Prints:

--- 0.13500046730041504 seconds ---
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109]
[0, 2000, 4000, 6000, 8000, 10000, 12000, 14000, 16000, 18000, 20000, 22000, 24000, 26000, 28000, 30000, 32000, 34000, 36000, 38000, 40000, 42000, 44000, 46000, 48000, 50000, 52000, 54000, 56000, 58000, 60000, 62000, 64000, 66000, 68000, 70000, 72000, 74000, 76000, 78000, 80000, 82000, 84000, 86000, 88000, 90000, 92000, 94000, 96000, 98000, 100000, 102000, 104000, 106000, 108000, 110000, 112000, 114000, 116000, 118000, 120000, 122000, 124000, 126000, 128000, 130000, 132000, 134000, 136000, 138000, 140000, 142000, 144000, 146000, 148000, 150000, 152000, 154000, 156000, 158000, 160000, 162000, 164000, 166000, 168000, 170000, 172000, 174000, 176000, 178000, 180000, 182000, 184000, 186000, 188000, 190000, 192000, 194000, 196000, 198000, 200000, 202000, 204000, 206000, 208000, 210000, 212000, 214000, 216000, 218000]

Serial run:
--- 0.01600050926208496 seconds ---

Conclusion

The above code serves as a useful demo as to how to use multiprocessing and as an example of when not to use multiprocessing because it will not improve performance.

Upvotes: 3

Related Questions