PJ R
PJ R

Reputation: 111

Parallelization within a python object

I am working on a simulation where I need to compute an expensive numerical integral at many different time points. Each integrand is a function of the time it is sampling up to, so I must evaluate each of the points independently. Because each integral is independent of all others, this can be implemented in an embarrassingly parallel fashion.

I would like to run this on an HPC cluster, so I have attempted to parallelize this process using mpi4py; however, my current implementation causes each processor to do the entire calculation (including the scattering to other cores) rather than have only the for loop inside of the object parallelized. As written, with n cores this takes n times as long as with one core (not a good sign...).

Because the only step which takes any amount of time is the computation itself, I would like everything except that specific for loop to run on the root node.

Below is a pseudo-code reduction of my current implementation:

import numpy as np
from mpi4py import MPI

COMM = MPI.COMM_WORLD

class Integrand:

    def __init__(self, t_max, dt, **kwargs):
        self.t_max = t_max
        self.dt = dt
        self.time_sample = np.arange(0, self.t_max, self.dt)
        self.function_args = kwargs
        self.final_result = np.empty_like(self.time_sample)

    def do_integration(self):
        if COMM.rank == 0:
            times_partitioned = split(self.time_sample, COMM.size)
        else:
            times_partitioned = None

        times_partitioned = COMM.scatter(times_partitioned, root=0)

        results = np.empty(times_partitioned.shape, dtype=complex)

        for counter, t in enumerate(times_partitioned):
            results = computation(self, t, **self.function_args)

        results = MPI.COMM_WORLD.gather(results, root=0)

        if COMM.rank is 0:
            ##inter-leaf back together
            for i in range(COMM.size):
                self.final_result[i::COMM.size] = results[i]

if __name__  = '__main__':

    kwargs_set = [kwargs1, kwargs2, kwargs3, ..., kwargsN]

    for kwargs in kwargs_set:
        integrand_object = Integrand(**kwargs)
        integrand_object.do_integration()
        save_and_plot_results(integrand_object.final_result)

Upvotes: 0

Views: 113

Answers (1)

PJ R
PJ R

Reputation: 111

A simple way to parallelize this problem without drastically changing how the class is called/used is to make use of a decorator. The decorator (shown below) makes it so that rather than creating the same object on every core, each core creates an object with the chunk of the time steps it needs to evaluate. After they have all been evaluated it gathers their results and returns a single object with the full result to one core. This particular implementation changes the class functionality slightly by forcing evaluation of the integral at creation time.

from functools import wraps
import numpy as np
from mpi4py import MPI

COMM = MPI.COMM_WORLD

def parallelize_integrand(integral_class):
    def split(container, count):
        return [container[_i::count] for _i in range(count)]

    @wraps(integral_class)
    def wrapper(*args,**kwargs):
         int_object = integral_class(*args, **kwargs)
         time_sample_total = int_object.time_sample
         if COMM.rank is 0:
             split_time = split(time_sample_total,COMM.size)
             final_result = np.empty_like(int_object.result)
         else:
             split_time = None
         split_time = COMM.scatter(split_time, root=0)
         int_object.time_sample = split_time
         int_object.do_integration()
         result = int_object.result
         result = COMM.gather(result, root=0)

         if COMM.rank is 0:
             for i in range(COMM.size):
                 final_result[i::COMM.size] = result[i]
             int_object.time_sample = time_sample_total
             int_object.result = final_result
             return int_object


@parallelize_integrand
class Integrand:

    def __init__(self, t_max, dt, **kwargs):
        self.t_max = t_max
        self.dt = dt
        self.time_sample = np.arange(0, self.t_max, self.dt)
        self.kwargs = kwargs
        self.result = np.empty_like(self.time_sample)

    def do_integration(self):
        for counter, t in enumerate(self.time_sample):
            result[counter] = computation(self, t, **self.kwargs)


if __name__  = '__main__':
    kwargs_set = [kwargs1, kwargs2, kwargs3, ..., kwargsN]
    for kwargs in kwargs_set:
        integrand_object = Integrand(**kwargs)
        save_and_plot_results(integrand_object.result)

Upvotes: 1

Related Questions