bsdfish
bsdfish

Reputation: 2454

Parallel mapping functions in IPython w/ multiple parameters

I'm trying to use IPython's parallel environment and so far, it's looking great but I'm running into a problem. Lets say that I have a function, defined in a library

def func(a,b):
   ...

that I use when I want to evaluate on one value of a and a bunch of values of b.

[func(myA, b) for b in myLongList]

Obviously, the real function is more complicated but the essence of the matter is that it takes multiple parameters and I'd like to map over only one of them. The problem is that map, @dview.parallel, etc. map over all the arguments.

So lets say I want to get the answer to func(myA, myLongList). The obvious way to do this is to curry, either w/ functools.partial or just as

dview.map_sync(lambda b: func(myA, b),   myLongList)

However, this does not work correctly on remote machines. The reason is that when the lambda expression is pickled, the value of myA is not included and instead, the value of myA from the local scope on the remote machine is used. When closures get pickled, the variables they close over don't.

Two ways I can think of doing this that will actually work are to manually construct lists for every argument and have map work over all of the arguments,

dview.map_sync(func, [myA]*len(myLongList), myLongList)   

or to horrifically use the data as default arguments to a function, forcing it to get pickled:

# Can't use a lambda here b/c lambdas don't use default arguments :(
def parallelFunc(b, myA = myA):
    return func(myA, b)

dview.map_sync(parallelFunc, myLongList)

Really, this all seems horribly contorted when the real function takes a lot of parameters and is more complicated. Is there some idiomatic way of doing this? Something like

@parallel(mapOver='b')
def  bigLongFn(a, b):
   ...

but as far as I know, nothing like the 'mapOver' thing exists. I probably have an idea of how to implement it ... this just feels like a very basic operation that there should exist support for so I want to check if I'm missing something.

Upvotes: 37

Views: 6995

Answers (5)

Kabira  K
Kabira K

Reputation: 2007

I am posting Alex S. comment as answer. This probably is right approach for this problem:

Just do partial application with a lambda. I know it looks weird, but using my_f = lambda a,my=other,arguments=go,right=here : f(a,my,arguments,right) is the simplest way to go about it without falling into pickling and pushing problems.

Upvotes: 0

Paul R
Paul R

Reputation: 319

I can improve a bit on batu's answer (which I think is a good one, but doesn't perhaps document in as much detail WHY you use those options). The ipython documentation is also currently woefully inadequate on this point. So your function is of the form:

def myfxn(a,b,c,d):
  ....
  return z

and stored in a file called mylib. Lets say b,c, and d are the same during your run, so you write a lambda function to reduce it to a 1-parameter function.

import mylib
mylamfxn=lambda a:mylib.myfxn(a,b,c,d)

and you want to run:

z=dview.map_sync(mylamfxn, iterable_of_a)

In a dream world, everything would magically work like that. However, first you'd get an error of "mylib not found," because the ipcluster processes haven't loaded mylib. Make sure the ipcluster processes have "mylib" in their python path and are in the correct working directory for myfxn, if necessary. Then you need to add to your python code:

dview.execute('import mylib')

which runs the import mylib command on each process. If you try again, you'll get an error along the lines of "global variable b not defined" because while the variables are in your python session, they aren't in the ipcluster processes. However, python provides a method of copying a group of variables to the subprocesses. Continuing the example above:

mydict=dict(b=b, c=c, d=d)
dview.push(mydict)

Now all of the subprocesses have access to b,c,and d. Then you can just run:

z=dview.map_sync(mylamfxn, iterable_of_a)

and it should now work as advertised. Anyway, I'm new to parallel computing with python, and found this thread useful, so I thought I'd try to help explain a few of the points that confused me a bit....

The final code would be:

import mylib

#set up parallel processes, start ipcluster from command line prior!
from IPython.parallel import Client
rc=Client()
dview=rc[:]

#...do stuff to get iterable_of_a and b,c,d....

mylamfxn=lambda a:mylib.myfxn(a,b,c,d)

dview.execute('import mylib')
mydict=dict(b=b, c=c, d=d)
dview.push(mydict)
z=dview.map_sync(mylamfxn, iterable_of_a)

This is probably the quickest and easiest way to make pretty much any embarrassingly parallel code run parallel in python....

UPDATE You can also use dview to push all the data without loops and then use an lview (i.e. lview=rc.load_balanced_view(); lview.map(...) to do the actual calculation in load balanced fashion.

Upvotes: 15

batu
batu

Reputation: 101

This is my first message to StackOverflow so please be gentle ;) I was trying to do the same thing, and came up with the following. I am pretty sure this is not the most efficient way, but seems to work somewhat. One caveat for now is that for some reason I only see two engines working at 100%, the others are sitting almost idle...

In order to call a multiple arg function in map I first wrote this routine in my personal parallel.py module:

def map(r,func, args=None, modules=None):
"""
Before you run parallel.map, start your cluster (e.g. ipcluster start -n 4)

map(r,func, args=None, modules=None):
args=dict(arg0=arg0,...)
modules='numpy, scipy'    

examples:
func= lambda x: numpy.random.rand()**2.
z=parallel.map(r_[0:1000], func, modules='numpy, numpy.random')
plot(z)

A=ones((1000,1000));
l=range(0,1000)
func=lambda x : A[x,l]**2.
z=parallel.map(r_[0:1000], func, dict(A=A, l=l))
z=array(z)

"""
from IPython.parallel import Client
mec = Client()
mec.clear()
lview=mec.load_balanced_view()
for k in mec.ids:
  mec[k].activate()
  if args is not None:
    mec[k].push(args)
  if modules is not None:
    mec[k].execute('import '+modules)
z=lview.map(func, r)
out=z.get()
return out

As you can see the function takes an args parameter which is a dict of parameters in the head nodes workspace. These parameters are then pushed to the engines. At that point they become local objects and can be used in the function directly. For example in the last example given above in comments, the A matrix is sliced using the l engine-local variable.

I must say that even though the above function works, I am not 100% happy with it at the moment. If I can come up with something better, I will post it here.

UPDATE:2013/04/11 I made minor changes to the code: - The activate statement was missing brackets. Causing it not to run. - Moved mec.clear() to the top of the function, as opposed to the end. I also noticed that it works best if I run it within ipython. For example, I may get errors if I run a script using the above function as "python ./myparallelrun.py" but not if I run it within ipython using "%run ./myparallelrun.py". Not sure why...

Upvotes: 6

dnozay
dnozay

Reputation: 24304

let's build on that:

dview.map_sync(func, [myA]*len(myLongList), myLongList)

maybe the following would work:

from itertools import izip_longest
dview.map_sync(func, izip_longest(myLongList, [], fillvalue=myA))

example:

>>> # notice that a is a tuple
... concat = lambda a: '%s %s' % a
>>> mylonglist = range(10)
>>> from itertools import izip_longest
>>> map(concat, izip_longest(mylonglist, [], fillvalue='mississippi'))
['0 mississippi', '1 mississippi', '2 mississippi', '3 mississippi',
'4 mississippi', '5 mississippi', '6 mississippi', '7 mississippi',
'8 mississippi', '9 mississippi']

Upvotes: 0

BostonJohn
BostonJohn

Reputation: 2661

An elegant way to do this is with partial functions.

If you know that you want the first argument of foo to be myArg, you can create a new function bar by

from functools import partial
bar = partial(foo, myARg)

bar(otherArg) will then return foo(myArg,otherArg)

Upvotes: 0

Related Questions