Abdul Qadir
Abdul Qadir

Reputation: 489

Does multiprocessing.pool.imap has a variant (like starmap) that allows for multiple arguments?

I am doing some calculations on large collections of bytes. The process runs on chunks of bytes. I am trying to use parallel processing using multiprocessing for performance enhancement. Initially I tried to use pool.map but that only allows single argument, then I found about pool.starmap. But pool.starmap gives results only when all the processes have finished. I want results as they come (sort of). I am trying to use pool.imap which does provide results as processes finish but does not allow multiple arguments (my function requires 2 arguments). Also, the sequence of result is important.

Some sample code below:

pool = mp.Pool(processes=4)
y = []
for x in pool.starmap(f, zip(da, repeat(db))):
    y.append(x)

The above code works, but only gives the results once all the processes have completed. I cannot see any progress. This is why I tried to use pool.imap, works well but with only single argument:

pool = mp.Pool(processes=4)
y = []
for x in pool.imap(f, da)):
    y.append(x)

On multiple arguments raises the following exception:

TypeError: f() missing 1 required positional argument: 'd'

Looking for simple way to achieve all 3 requirements:

  1. parallel processing using multiple parameters/arguments
  2. manage to see progress while the processes are running
  3. ordered results.

Thanks!

Upvotes: 28

Views: 15751

Answers (5)

Austin A
Austin A

Reputation: 3138

I can answer the first two question pretty quickly. I think you should be able to handle the third question after understanding the first two.

1. Parallel Processing with Multiple Arguments

I'm not sure about the whole "starmap" equivalent but here's an alternative. What I've done in the past is condense my arguments into a single data object like a list. For example, if you want to pass three arguments to your map_function, you could append those arguments into a list, and then use the list with the .map() or .imap() function.

def map_function(combo):
    a = combo[0]
    b = combo[1]
    c = combo[2]
    return a + b + c

if '__name__' == '__main__':
    combo = []
    combo[0] = arg_1
    combo[1] = arg_2
    combo[2] = arg_3

    pool = Pool(processes=4)
    pool.map(map_function, combo)

2. Tracking Progress

A good way to do this is using multiprocessing's shared value. I actually asked this (almost) same exact question about a month ago. This allows you to manipulate the same variable from the different processes created by your map function.

Upvotes: 6

lukaz
lukaz

Reputation: 86

Example Case 1 - If your worker needs multiple input parameters then use Pool.starmap() with the parameters packed as iterable of tuples eg.: list of tuples:

from multiprocessing import Pool
    
def powerto(x, n):
    print(f"Worker {multiprocessing.current_process().pid} working on input values {x}^{n}\n")
    return (x ** n)

def compute_parallel():    
    # Pack parameters for powerto worker by creating a list of tuples for starmap()
    list1 = [1, 2, 3]
    list2 = [2, 3, 4]
    tuple_list = list(zip(list1, list2))
    
    with Pool() as pool:
        result = pool.starmap(powerto, tuple_list)
    print(result)

if __name__ == "__main__":
    compute_parallel()

Example Case 2 - If your worker accepts single parameter then use Pool.map() as in below example:

from multiprocessing import Pool

def square(x):
    print(f"Worker {multiprocessing.current_process().pid} working on input value {x}\n")
    return (x * x)

def compute_parallel():
    with Pool() as pool:
        result = pool.map(square, list(range(10,100,10)))
    print(result)

if __name__ == "__main__":
    compute_parallel()

Upvotes: 0

LemmeTestThat
LemmeTestThat

Reputation: 678

This answer comes a bit late(8 years), but I feel like there is more to add to the current existing answers, for posterity:

  • The answer using functools.partial() from confused00 is pretty good, assuming only 1 argument is actually changing in each call (which seems to be the case in the question, but I'm aiming for a more general answer)
  • Austin A's answer involving changing the function signature is the basis of what I'll propose, but directly changing the function signature is not always an option, for e.g. if the function in question is from an external package. I propose a simple general-purpose wrapper function.

I also recommend using the tqdm package for progress bars

Overall, here is the solution I propose (following the samples from the question and extrapolating in a few missing bits):

from itertools import repeat
from tqdm import tqdm
import multiprocessing as mp

def f(a, b):
    # Assume this is actually an external function we cannot change
    return a+b

def f_wrapped(arg):
    return f(*arg)  # Unpacks args

if __name__ == '__main__':
    da = range(10)
    db = 3

    pool = mp.Pool(processes=4)
    y = []
    for x in tqdm(pool.imap(f_wrapped, zip(da, repeat(db))), total=len(da)):
        y.append(x)

    print(y)

Output:

100%|██████████| 10/10 [00:00<00:00, 63.69it/s]
[3, 4, 5, 6, 7, 8, 9, 10, 11, 12]

Upvotes: 3

confused00
confused00

Reputation: 2612

You can simulate starmap using imap via the functools.partial() function:

import functools
import multiprocessing as mp

def my_function(constant, my_list, optional_param=None):
    print(locals())

with mp.Pool() as pool:
    list(
        pool.imap(
            functools.partial(
                my_function, 2, optional_param=3
            ),
            [1,2,3,4,5]
        )
    )

Outputs:

$ python3 foo.py
{'optional_param': 3, 'my_list': 1, 'constant': 2}
{'optional_param': 3, 'my_list': 3, 'constant': 2}
{'optional_param': 3, 'my_list': 2, 'constant': 2}
{'optional_param': 3, 'my_list': 4, 'constant': 2}
{'optional_param': 3, 'my_list': 5, 'constant': 2}

Upvotes: 11

Mike McKerns
Mike McKerns

Reputation: 35247

I think this solution exactly meets your 3 requirements: https://stackoverflow.com/a/28382913/2379433

In short, p = Pool(); p.imap will enable you to see progress and maintain order. If you want map functions with multiple arguments, you can use a fork of multiprocessing that provides better serialization and multiple arguments. See the link for an example.

Upvotes: 0

Related Questions