Reputation: 111
I am working on a simulation where I need to compute an expensive numerical integral at many different time points. Each integrand is a function of the time it is sampling up to, so I must evaluate each of the points independently. Because each integral is independent of all others, this can be implemented in an embarrassingly parallel fashion.
I would like to run this on an HPC cluster, so I have attempted to parallelize this process using mpi4py; however, my current implementation causes each processor to do the entire calculation (including the scattering to other cores) rather than have only the for loop inside of the object parallelized. As written, with n cores this takes n times as long as with one core (not a good sign...).
Because the only step which takes any amount of time is the computation itself, I would like everything except that specific for loop to run on the root node.
Below is a pseudo-code reduction of my current implementation:
import numpy as np
from mpi4py import MPI
COMM = MPI.COMM_WORLD
class Integrand:
def __init__(self, t_max, dt, **kwargs):
self.t_max = t_max
self.dt = dt
self.time_sample = np.arange(0, self.t_max, self.dt)
self.function_args = kwargs
self.final_result = np.empty_like(self.time_sample)
def do_integration(self):
if COMM.rank == 0:
times_partitioned = split(self.time_sample, COMM.size)
else:
times_partitioned = None
times_partitioned = COMM.scatter(times_partitioned, root=0)
results = np.empty(times_partitioned.shape, dtype=complex)
for counter, t in enumerate(times_partitioned):
results = computation(self, t, **self.function_args)
results = MPI.COMM_WORLD.gather(results, root=0)
if COMM.rank is 0:
##inter-leaf back together
for i in range(COMM.size):
self.final_result[i::COMM.size] = results[i]
if __name__ = '__main__':
kwargs_set = [kwargs1, kwargs2, kwargs3, ..., kwargsN]
for kwargs in kwargs_set:
integrand_object = Integrand(**kwargs)
integrand_object.do_integration()
save_and_plot_results(integrand_object.final_result)
Upvotes: 0
Views: 113
Reputation: 111
A simple way to parallelize this problem without drastically changing how the class is called/used is to make use of a decorator. The decorator (shown below) makes it so that rather than creating the same object on every core, each core creates an object with the chunk of the time steps it needs to evaluate. After they have all been evaluated it gathers their results and returns a single object with the full result to one core. This particular implementation changes the class functionality slightly by forcing evaluation of the integral at creation time.
from functools import wraps
import numpy as np
from mpi4py import MPI
COMM = MPI.COMM_WORLD
def parallelize_integrand(integral_class):
def split(container, count):
return [container[_i::count] for _i in range(count)]
@wraps(integral_class)
def wrapper(*args,**kwargs):
int_object = integral_class(*args, **kwargs)
time_sample_total = int_object.time_sample
if COMM.rank is 0:
split_time = split(time_sample_total,COMM.size)
final_result = np.empty_like(int_object.result)
else:
split_time = None
split_time = COMM.scatter(split_time, root=0)
int_object.time_sample = split_time
int_object.do_integration()
result = int_object.result
result = COMM.gather(result, root=0)
if COMM.rank is 0:
for i in range(COMM.size):
final_result[i::COMM.size] = result[i]
int_object.time_sample = time_sample_total
int_object.result = final_result
return int_object
@parallelize_integrand
class Integrand:
def __init__(self, t_max, dt, **kwargs):
self.t_max = t_max
self.dt = dt
self.time_sample = np.arange(0, self.t_max, self.dt)
self.kwargs = kwargs
self.result = np.empty_like(self.time_sample)
def do_integration(self):
for counter, t in enumerate(self.time_sample):
result[counter] = computation(self, t, **self.kwargs)
if __name__ = '__main__':
kwargs_set = [kwargs1, kwargs2, kwargs3, ..., kwargsN]
for kwargs in kwargs_set:
integrand_object = Integrand(**kwargs)
save_and_plot_results(integrand_object.result)
Upvotes: 1