rnorris
rnorris

Reputation: 2092

Sharing numpy arrays in python multiprocessing pool

I'm working on some code that does some fairly heavy numerical work on a large (tens to hundreds of thousands of numerical integrations) set of problems. Fortunately, these integrations are embarassingly parallel, so it's easy to use Pool.map() to split up the work across multiple cores.

Right now, I have a program that has this basic workflow:

#!/usr/bin/env python
from multiprocessing import Pool
from scipy import *
from my_parser import parse_numpy_array
from my_project import heavy_computation

#X is a global multidimensional numpy array
X = parse_numpy_array("input.dat")
param_1 = 0.0168
param_2 = 1.505

def do_work(arg):
  return heavy_computation(X, param_1, param_2, arg)

if __name__=='__main__':
  pool = Pool()
  arglist = linspace(0.0,1.0,100)
  results = Pool.map(do_work,arglist)
  #save results in a .npy file for analysis
  save("Results", [X,results])

Since X, param_1, and param_2 are hard-coded and initialized in exactly the same way for each process in the pool, this all works fine. Now that I have my code working, I'd like to make it so that the file name, param_1, and param_2 are input by the user at run-time, rather than being hard-coded.

One thing that should be noted is that X, param_1, and param_2 are not modified as the work is being done. Since I don't modify them, I could do something like this at the beginning of the program:

import sys
X = parse_numpy_array(sys.argv[1])
param_1 = float(sys.argv[2])
param_2 = float(sys.argv[3])

And that would do the trick, but since most users of this code are running the code from Windows machines, I'd rather not go the route of command-line arguments.

What I would really like to do is something like this:

X, param_1, param_2 = None, None, None

def init(x,p1, p2)
  X = x
  param_1 = p1
  param_2 = p2

if __name__=='__main__':
  filename = raw_input("Filename> ")
  param_1 = float(raw_input("Parameter 1: "))
  param_2 = float(raw_input("Parameter 2: "))
  X = parse_numpy_array(filename)
  pool = Pool(initializer = init, initargs = (X, param_1, param_2,))
  arglist = linspace(0.0,1.0,100)
  results = Pool.map(do_work,arglist)
  #save results in a .npy file for analysis
  save("Results", [X,results])

But, of course, this fails and X/param_1/param_2 are all None when the pool.map call happens. I'm pretty new to multiprocessing, so I'm not sure why the call to the initializer fails. Is there a way to do what I want to do? Is there a better way to go about this altogether? I've also looked at using shared data, but from my understanding of the documentation, that only works on ctypes, which don't include numpy arrays. Any help with this would be greatly appreciated.

Upvotes: 7

Views: 6175

Answers (2)

erasing
erasing

Reputation: 526

I had a similar problem. If you just want to read my solution skip some lines :) I had to:

  • share a numpy.array between threads operating on different part of it and...
  • pass Pool.map a function with more then one argument.

I noticed that:

  • the data of the numpy.array was correctly read but...
  • changes on the numpy.array where not made permanent
  • Pool.map had problems handling lambda functions, or so it appeared to me (if this point is not clear to you, just ignore it)

My solution was to:

  • make the target function only argument a list
  • make the target function return the modified data instead of directly trying to write on the numpy.array

I understand that your do_work function already return the computed data, so you would just have to modify to_work to accept a list (containing X,param_1,param_2 and arg) as argument and to pack the input to the target function in this format before passing it to Pool.map.

Here is a sample implementation:

def do_work2(args):
    X,param_1,param_2,arg = args
    return heavy_computation(X, param_1, param_2, arg)

Now you have to pack the input to the do_work function before calling it. Your main become:

if __name__=='__main__':
   filename = raw_input("Filename> ")
   param_1 = float(raw_input("Parameter 1: "))
   param_2 = float(raw_input("Parameter 2: "))
   X = parse_numpy_array(filename)
   # now you pack the input arguments
   arglist = [[X,param1,param2,n] for n in linspace(0.0,1.0,100)]
   # consider that you're not making 100 copies of X here. You're just passing a reference to it
   results = Pool.map(do_work2,arglist)
   #save results in a .npy file for analysis
   save("Results", [X,results])

Upvotes: 5

Ken
Ken

Reputation: 1868

To make your last idea work, I think you can simply make X, param_1, and param_2 global variables by using the global keyword before modifying them inside the if statement. So add the following:

global X
global param_1
global param_2

directly after the if __name__ == '__main__'.

Upvotes: -2

Related Questions