Reputation: 9631
In the Python multiprocessing
library, is there a variant of pool.map
which supports multiple arguments?
import multiprocessing
text = "test"
def harvester(text, case):
X = case[0]
text + str(X)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=6)
case = RAW_DATASET
pool.map(harvester(text, case), case, 1)
pool.close()
pool.join()
Upvotes: 919
Views: 1130106
Reputation: 2086
I think the below will be better:
def multi_run_wrapper(args):
return add(*args)
def add(x,y):
return x+y
if __name__ == "__main__":
from multiprocessing import Pool
with Pool(4) as pool:
results = pool.map(multi_run_wrapper,[(1,2),(2,3),(3,4)])
print results
Output
[3, 5, 7]
Upvotes: 192
Reputation: 12314
Using Python 3.3+ with pool.starmap():
from multiprocessing.dummy import Pool as ThreadPool
def write_func(i, x):
print(i, "---", x)
a = ["1","2","3"]
b = ["4","5","6"]
pool = ThreadPool(2)
pool.starmap(write_func, zip(a,b))
pool.close()
pool.join()
Result:
1 --- 4
2 --- 5
3 --- 6
You can also zip() more arguments if you like: zip(a,b,c,d,e)
In case you want to have a constant value passed as an argument:
import itertools
zip(itertools.repeat(constant), a)
In case your function should return something:
results = pool.starmap(write_func, zip(a,b))
This gives a List with the returned values.
Upvotes: 129
Reputation: 1704
Let's keep it simple and straightforward, refer my soltuon :
from multiprocessing.pool import ThreadPool
from functools import partial
from time import sleep
from random import randint
def dosomething(var,s):
sleep(randint(1,5))
print(var)
return var + s
array = ["a", "b", "c", "d", "e"]
with ThreadPool(processes=5) as pool:
resp_ = pool.map(partial(dosomething,s="2"), array)
print(resp_)
Output:
a
b
d
e
c
['a2', 'b2', 'c2', 'd2', 'e2']
Upvotes: 17
Reputation: 1049
A slightly different approach- this example aims to download a bunch of files.
from multiprocessing import Pool
def download_file(batch):
items_to_grab, var1, var2, etc. = batch
...
##batch yourself instead of using pool.map's chunk argument
batches = list(batch(items_to_grab, 200))
##now create tuples out of each chunk and add other variables you want to send along
batches = [(x, var1, var2, etc.) for x in batches]
with Pool(5) as p:
results = p.map(download_file, batches)
Upvotes: 0
Reputation: 3929
A better way is using a decorator instead of writing a wrapper function by hand. Especially when you have a lot of functions to map, a decorator will save your time by avoiding writing a wrapper for every function. Usually a decorated function is not picklable, however we may use functools
to get around it. More discussions can be found here.
Here is the example:
def unpack_args(func):
from functools import wraps
@wraps(func)
def wrapper(args):
if isinstance(args, dict):
return func(**args)
else:
return func(*args)
return wrapper
@unpack_args
def func(x, y):
return x + y
Then you may map it with zipped arguments:
np, xlist, ylist = 2, range(10), range(10)
pool = Pool(np)
res = pool.map(func, zip(xlist, ylist))
pool.close()
pool.join()
Of course, you may always use Pool.starmap
in Python 3 (>=3.3) as mentioned in other answers.
Upvotes: 10
Reputation: 1656
From Python 3.4.4, you can use multiprocessing.get_context() to obtain a context object to use multiple start methods:
import multiprocessing as mp
def foo(q, h, w):
q.put(h + ' ' + w)
print(h + ' ' + w)
if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target=foo, args=(q,'hello', 'world'))
p.start()
print(q.get())
p.join()
Or you just simply replace
pool.map(harvester(text, case), case, 1)
with:
pool.apply_async(harvester(text, case), case, 1)
Upvotes: 4
Reputation: 13467
Another way is to pass a list of lists to a one-argument routine:
import os
from multiprocessing import Pool
def task(args):
print "PID =", os.getpid(), ", arg1 =", args[0], ", arg2 =", args[1]
pool = Pool()
pool.map(task, [
[1,2],
[3,4],
[5,6],
[7,8]
])
One can then construct a list lists of arguments with one's favorite method.
Upvotes: 10
Reputation: 4138
Having learnt about itertools in J.F. Sebastian's answer I decided to take it a step further and write a parmap
package that takes care about parallelization, offering map
and starmap
functions in Python 2.7 and Python 3.2 (and later also) that can take any number of positional arguments.
Installation
pip install parmap
How to parallelize:
import parmap
# If you want to do:
y = [myfunction(x, argument1, argument2) for x in mylist]
# In parallel:
y = parmap.map(myfunction, mylist, argument1, argument2)
# If you want to do:
z = [myfunction(x, y, argument1, argument2) for (x,y) in mylist]
# In parallel:
z = parmap.starmap(myfunction, mylist, argument1, argument2)
# If you want to do:
listx = [1, 2, 3, 4, 5, 6]
listy = [2, 3, 4, 5, 6, 7]
param = 3.14
param2 = 42
listz = []
for (x, y) in zip(listx, listy):
listz.append(myfunction(x, y, param1, param2))
# In parallel:
listz = parmap.starmap(myfunction, zip(listx, listy), param1, param2)
I have uploaded parmap to PyPI and to a GitHub repository.
As an example, the question can be answered as follows:
import parmap
def harvester(case, text):
X = case[0]
text+ str(X)
if __name__ == "__main__":
case = RAW_DATASET # assuming this is an iterable
parmap.map(harvester, case, "test", chunksize=1)
Upvotes: 31
Reputation: 35207
There's a fork of multiprocessing
called pathos (note: use the version on GitHub) that doesn't need starmap
-- the map functions mirror the API for Python's map, thus map can take multiple arguments.
With pathos
, you can also generally do multiprocessing in the interpreter, instead of being stuck in the __main__
block. Pathos is due for a release, after some mild updating -- mostly conversion to Python 3.x.
Python 2.7.5 (default, Sep 30 2013, 20:15:49)
[GCC 4.2.1 (Apple Inc. build 5566)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> def func(a,b):
... print a,b
...
>>>
>>> from pathos.multiprocessing import ProcessingPool
>>> pool = ProcessingPool(nodes=4)
>>> pool.map(func, [1,2,3], [1,1,1])
1 1
2 1
3 1
[None, None, None]
>>>
>>> # also can pickle stuff like lambdas
>>> result = pool.map(lambda x: x**2, range(10))
>>> result
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>>
>>> # also does asynchronous map
>>> result = pool.amap(pow, [1,2,3], [4,5,6])
>>> result.get()
[1, 32, 729]
>>>
>>> # or can return a map iterator
>>> result = pool.imap(pow, [1,2,3], [4,5,6])
>>> result
<processing.pool.IMapIterator object at 0x110c2ffd0>
>>> list(result)
[1, 32, 729]
pathos
has several ways that that you can get the exact behavior of starmap
.
>>> def add(*x):
... return sum(x)
...
>>> x = [[1,2,3],[4,5,6]]
>>> import pathos
>>> import numpy as np
>>> # use ProcessPool's map and transposing the inputs
>>> pp = pathos.pools.ProcessPool()
>>> pp.map(add, *np.array(x).T)
[6, 15]
>>> # use ProcessPool's map and a lambda to apply the star
>>> pp.map(lambda x: add(*x), x)
[6, 15]
>>> # use a _ProcessPool, which has starmap
>>> _pp = pathos.pools._ProcessPool()
>>> _pp.starmap(add, x)
[6, 15]
>>>
Upvotes: 19
Reputation: 1025
A better solution for Python 2:
from multiprocessing import Pool
def func((i, (a, b))):
print i, a, b
return a + b
pool = Pool(3)
pool.map(func, [(0,(1,2)), (1,(2,3)), (2,(3, 4))])
2 3 4
1 2 3
0 1 2
out[]:
[3, 5, 7]
Upvotes: 10
Reputation: 115
For Python 2, you can use this trick
def fun(a, b):
return a + b
pool = multiprocessing.Pool(processes=6)
b = 233
pool.map(lambda x:fun(x, b), range(1000))
Upvotes: -2
Reputation: 319
Store all your arguments as an array of tuples.
The example says normally you call your function as:
def mainImage(fragCoord: vec2, iResolution: vec3, iTime: float) -> vec3:
Instead pass one tuple and unpack the arguments:
def mainImage(package_iter) -> vec3:
fragCoord = package_iter[0]
iResolution = package_iter[1]
iTime = package_iter[2]
Build up the tuple by using a loop beforehand:
package_iter = []
iResolution = vec3(nx, ny, 1)
for j in range((ny-1), -1, -1):
for i in range(0, nx, 1):
fragCoord: vec2 = vec2(i, j)
time_elapsed_seconds = 10
package_iter.append((fragCoord, iResolution, time_elapsed_seconds))
Then execute all using map by passing the array of tuples:
array_rgb_values = []
with concurrent.futures.ProcessPoolExecutor() as executor:
for val in executor.map(mainImage, package_iter):
fragColor = val
ir = clip(int(255* fragColor.r), 0, 255)
ig = clip(int(255* fragColor.g), 0, 255)
ib = clip(int(255* fragColor.b), 0, 255)
array_rgb_values.append((ir, ig, ib))
I know Python has *
and **
for unpacking, but I haven't tried those yet.
Also better to use the higher-level library concurrent futures than the low level multiprocessing library.
Upvotes: 0
Reputation: 141
import time
from multiprocessing import Pool
def f1(args):
vfirst, vsecond, vthird = args[0] , args[1] , args[2]
print(f'First Param: {vfirst}, Second value: {vsecond} and finally third value is: {vthird}')
pass
if __name__ == '__main__':
p = Pool()
result = p.map(f1, [['Dog','Cat','Mouse']])
p.close()
p.join()
print(result)
Upvotes: 2
Reputation: 3563
This might be another option. The trick is in the wrapper
function that returns another function which is passed in to pool.map
. The code below reads an input array and for each (unique) element in it, returns how many times (ie counts) that element appears in the array, For example if the input is
np.eye(3) = [ [1. 0. 0.]
[0. 1. 0.]
[0. 0. 1.]]
then zero appears 6 times and one 3 times
import numpy as np
from multiprocessing.dummy import Pool as ThreadPool
from multiprocessing import cpu_count
def extract_counts(label_array):
labels = np.unique(label_array)
out = extract_counts_helper([label_array], labels)
return out
def extract_counts_helper(args, labels):
n = max(1, cpu_count() - 1)
pool = ThreadPool(n)
results = {}
pool.map(wrapper(args, results), labels)
pool.close()
pool.join()
return results
def wrapper(argsin, results):
def inner_fun(label):
label_array = argsin[0]
counts = get_label_counts(label_array, label)
results[label] = counts
return inner_fun
def get_label_counts(label_array, label):
return sum(label_array.flatten() == label)
if __name__ == "__main__":
img = np.ones([2,2])
out = extract_counts(img)
print('input array: \n', img)
print('label counts: ', out)
print("========")
img = np.eye(3)
out = extract_counts(img)
print('input array: \n', img)
print('label counts: ', out)
print("========")
img = np.random.randint(5, size=(3, 3))
out = extract_counts(img)
print('input array: \n', img)
print('label counts: ', out)
print("========")
You should get:
input array:
[[1. 1.]
[1. 1.]]
label counts: {1.0: 4}
========
input array:
[[1. 0. 0.]
[0. 1. 0.]
[0. 0. 1.]]
label counts: {0.0: 6, 1.0: 3}
========
input array:
[[4 4 0]
[2 4 3]
[2 3 1]]
label counts: {0: 1, 1: 1, 2: 2, 3: 2, 4: 3}
========
Upvotes: 2
Reputation: 2074
How to take multiple arguments:
def f1(args):
a, b, c = args[0] , args[1] , args[2]
return a+b+c
if __name__ == "__main__":
import multiprocessing
pool = multiprocessing.Pool(4)
result1 = pool.map(f1, [ [1,2,3] ])
print(result1)
Upvotes: 94
Reputation: 3750
Here is another way to do it that IMHO is more simple and elegant than any of the other answers provided.
This program has a function that takes two parameters, prints them out and also prints the sum:
import multiprocessing
def main():
with multiprocessing.Pool(10) as pool:
params = [ (2, 2), (3, 3), (4, 4) ]
pool.starmap(printSum, params)
# end with
# end function
def printSum(num1, num2):
mySum = num1 + num2
print('num1 = ' + str(num1) + ', num2 = ' + str(num2) + ', sum = ' + str(mySum))
# end function
if __name__ == '__main__':
main()
output is:
num1 = 2, num2 = 2, sum = 4
num1 = 3, num2 = 3, sum = 6
num1 = 4, num2 = 4, sum = 8
See the python docs for more info:
https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.pool
In particular be sure to check out the starmap
function.
I'm using Python 3.6, I'm not sure if this will work with older Python versions
Why there is not a very straight-forward example like this in the docs, I'm not sure.
Upvotes: 7
Reputation: 414069
is there a variant of pool.map which support multiple arguments?
Python 3.3 includes pool.starmap()
method:
#!/usr/bin/env python3
from functools import partial
from itertools import repeat
from multiprocessing import Pool, freeze_support
def func(a, b):
return a + b
def main():
a_args = [1,2,3]
second_arg = 1
with Pool() as pool:
L = pool.starmap(func, [(1, 1), (2, 1), (3, 1)])
M = pool.starmap(func, zip(a_args, repeat(second_arg)))
N = pool.map(partial(func, b=second_arg), a_args)
assert L == M == N
if __name__=="__main__":
freeze_support()
main()
For older versions:
#!/usr/bin/env python2
import itertools
from multiprocessing import Pool, freeze_support
def func(a, b):
print a, b
def func_star(a_b):
"""Convert `f([1,2])` to `f(1,2)` call."""
return func(*a_b)
def main():
pool = Pool()
a_args = [1,2,3]
second_arg = 1
pool.map(func_star, itertools.izip(a_args, itertools.repeat(second_arg)))
if __name__=="__main__":
freeze_support()
main()
1 1
2 1
3 1
Notice how itertools.izip()
and itertools.repeat()
are used here.
Due to the bug mentioned by @unutbu you can't use functools.partial()
or similar capabilities on Python 2.6, so the simple wrapper function func_star()
should be defined explicitly. See also the workaround suggested by uptimebox
.
Upvotes: 836
Reputation: 4036
There are many answers here, but none seem to provide Python 2/3 compatible code that will work on any version. If you want your code to just work, this will work for either Python version:
# For python 2/3 compatibility, define pool context manager
# to support the 'with' statement in Python 2
if sys.version_info[0] == 2:
from contextlib import contextmanager
@contextmanager
def multiprocessing_context(*args, **kwargs):
pool = multiprocessing.Pool(*args, **kwargs)
yield pool
pool.terminate()
else:
multiprocessing_context = multiprocessing.Pool
After that, you can use multiprocessing the regular Python 3 way, however you like. For example:
def _function_to_run_for_each(x):
return x.lower()
with multiprocessing_context(processes=3) as pool:
results = pool.map(_function_to_run_for_each, ['Bob', 'Sue', 'Tim']) print(results)
will work in Python 2 or Python 3.
Upvotes: 3
Reputation: 21
This is an example of the routine I use to pass multiple arguments to a one-argument function used in a pool.imap fork:
from multiprocessing import Pool
# Wrapper of the function to map:
class makefun:
def __init__(self, var2):
self.var2 = var2
def fun(self, i):
var2 = self.var2
return var1[i] + var2
# Couple of variables for the example:
var1 = [1, 2, 3, 5, 6, 7, 8]
var2 = [9, 10, 11, 12]
# Open the pool:
pool = Pool(processes=2)
# Wrapper loop
for j in range(len(var2)):
# Obtain the function to map
pool_fun = makefun(var2[j]).fun
# Fork loop
for i, value in enumerate(pool.imap(pool_fun, range(len(var1))), 0):
print(var1[i], '+' ,var2[j], '=', value)
# Close the pool
pool.close()
Upvotes: 2
Reputation: 31
text = "test"
def unpack(args):
return args[0](*args[1:])
def harvester(text, case):
X = case[0]
text+ str(X)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=6)
case = RAW_DATASET
# args is a list of tuples
# with the function to execute as the first item in each tuple
args = [(harvester, text, c) for c in case]
# doing it this way, we can pass any function
# and we don't need to define a wrapper for each different function
# if we need to use more than one
pool.map(unpack, args)
pool.close()
pool.join()
Upvotes: 2
Reputation: 150947
The answer to this is version- and situation-dependent. The most general answer for recent versions of Python (since 3.3) was first described below by J.F. Sebastian.1 It uses the Pool.starmap
method, which accepts a sequence of argument tuples. It then automatically unpacks the arguments from each tuple and passes them to the given function:
import multiprocessing
from itertools import product
def merge_names(a, b):
return '{} & {}'.format(a, b)
if __name__ == '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with multiprocessing.Pool(processes=3) as pool:
results = pool.starmap(merge_names, product(names, repeat=2))
print(results)
# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...
For earlier versions of Python, you'll need to write a helper function to unpack the arguments explicitly. If you want to use with
, you'll also need to write a wrapper to turn Pool
into a context manager. (Thanks to muon for pointing this out.)
import multiprocessing
from itertools import product
from contextlib import contextmanager
def merge_names(a, b):
return '{} & {}'.format(a, b)
def merge_names_unpack(args):
return merge_names(*args)
@contextmanager
def poolcontext(*args, **kwargs):
pool = multiprocessing.Pool(*args, **kwargs)
yield pool
pool.terminate()
if __name__ == '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with poolcontext(processes=3) as pool:
results = pool.map(merge_names_unpack, product(names, repeat=2))
print(results)
# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...
In simpler cases, with a fixed second argument, you can also use partial
, but only in Python 2.7+.
import multiprocessing
from functools import partial
from contextlib import contextmanager
@contextmanager
def poolcontext(*args, **kwargs):
pool = multiprocessing.Pool(*args, **kwargs)
yield pool
pool.terminate()
def merge_names(a, b):
return '{} & {}'.format(a, b)
if __name__ == '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with poolcontext(processes=3) as pool:
results = pool.map(partial(merge_names, b='Sons'), names)
print(results)
# Output: ['Brown & Sons', 'Wilson & Sons', 'Bartlett & Sons', ...
1. Much of this was inspired by his answer, which should probably have been accepted instead. But since this one is stuck at the top, it seemed best to improve it for future readers.
Upvotes: 511
Reputation: 281
In the official documentation states that it supports only one iterable argument. I like to use apply_async in such cases. In your case I would do:
from multiprocessing import Process, Pool, Manager
text = "test"
def harvester(text, case, q = None):
X = case[0]
res = text+ str(X)
if q:
q.put(res)
return res
def block_until(q, results_queue, until_counter=0):
i = 0
while i < until_counter:
results_queue.put(q.get())
i+=1
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=6)
case = RAW_DATASET
m = Manager()
q = m.Queue()
results_queue = m.Queue() # when it completes results will reside in this queue
blocking_process = Process(block_until, (q, results_queue, len(case)))
blocking_process.start()
for c in case:
try:
res = pool.apply_async(harvester, (text, case, q = None))
res.get(timeout=0.1)
except:
pass
blocking_process.join()
Upvotes: 3
Reputation: 1323
Another simple alternative is to wrap your function parameters in a tuple and then wrap the parameters that should be passed in tuples as well. This is perhaps not ideal when dealing with large pieces of data. I believe it would make copies for each tuple.
from multiprocessing import Pool
def f((a,b,c,d)):
print a,b,c,d
return a + b + c +d
if __name__ == '__main__':
p = Pool(10)
data = [(i+0,i+1,i+2,i+3) for i in xrange(10)]
print(p.map(f, data))
p.close()
p.join()
Gives the output in some random order:
0 1 2 3
1 2 3 4
2 3 4 5
3 4 5 6
4 5 6 7
5 6 7 8
7 8 9 10
6 7 8 9
8 9 10 11
9 10 11 12
[6, 10, 14, 18, 22, 26, 30, 34, 38, 42]
Upvotes: 9
Reputation: 705
You can use the following two functions so as to avoid writing a wrapper for each new function:
import itertools
from multiprocessing import Pool
def universal_worker(input_pair):
function, args = input_pair
return function(*args)
def pool_args(function, *args):
return zip(itertools.repeat(function), zip(*args))
Use the function function
with the lists of arguments arg_0
, arg_1
and arg_2
as follows:
pool = Pool(n_core)
list_model = pool.map(universal_worker, pool_args(function, arg_0, arg_1, arg_2)
pool.close()
pool.join()
Upvotes: 9