anirudhc1229
anirudhc1229

Reputation: 65

How to implement multiprocessing in Monte Carlo integration

I created a Python program that integrates a given function over a given interval using Monte Carlo simulation. It works well, except for the fact that it runs painfully slow when you want higher levels of accuracy (larger N value). I figured I'd give multiprocessing a try in order to speed it up, but then I realized I have no clue how to implement it. Here's what I have right now:

from scipy import random
import numpy as np
import matplotlib.pyplot as plt
from multiprocessing import Process
import os

# GOAL: Approximate the integral of a function f(x) from lower bound a to upper bound b using Monte Carlo simulation

# bounds of integration
a = 0
b = np.pi

# function to integrate
def f(x):
    return np.sin(x)

N = 10000
areas = []


def mcIntegrate():
    
    for i in range(N):
        
        # array filled with random numbers between limits
        xrand = random.uniform(a, b, N)
        
        # sum the return values of the function of each random number
        integral = 0.0
        for i in range(N):
            integral += f(xrand[i])
        
        # scale integral by difference of bounds divided by amount of random values
        ans = integral * ((b - a) / float(N))
        
        # add approximation to list of other approximations
        areas.append(ans)


if __name__ == "__main__":

    processes = []
    numProcesses = os.cpu_count()

    for i in range(numProcesses):
        process = Process(target=mcIntegrate)
        processes.append(process)

    for process in processes:
        process.start()

    for process in processes:
        process.start()

    # graph approximation distribution
    plt.title("Distribution of Approximated Integrals")
    plt.hist(areas, bins=30, ec='black')
    plt.xlabel("Areas")
    plt.show()

Can I get some help with this implementation?


Took advice from the comments and used multiprocessor.Pool, and also cut down on some operations by using NumPy instead. Went from taking about 5min to run to now about 6sec (for N = 10000). Here's my implementation:

import scipy
import numpy as np
import matplotlib.pyplot as plt
import multiprocessing
import os

# GOAL: Approximate the integral of function f from lower bound a to upper bound b using Monte Carlo simulation

a = 0       # lower bound of integration
b = np.pi   # upper bound of integration
f = np.sin  # function to integrate
N = 10000   # sample size

def mcIntegrate(p):
    xrand = scipy.random.uniform(a, b, N)       # create array filled with random numbers within bounds
    integral = np.sum(f(xrand))                 # sum return values of function of each random number
    approx = integral * ((b - a) / float(N))    # scale integral by difference of bounds divided by sample size
    return approx


if __name__ == "__main__":
    
    # run simulation N times in parallel and store results in array
    with multiprocessing.Pool(os.cpu_count()) as pool:
        areas = pool.map(mcIntegrate, range(N))

    # graph approximation distribution
    plt.title("Distribution of Approximated Integrals")
    plt.hist(areas, bins=30, ec='black')
    plt.xlabel("Areas")
    plt.show()

Upvotes: 2

Views: 2338

Answers (2)

anon01
anon01

Reputation: 11171

My experience is that multiprocessing is often not very efficient (a ton of overhead). The more you push your code into numpy the faster it will be, with one caveat; you can overload your memory if you're not careful (10k x 10k is getting large). Lastly, it looks like N is doing double duty, both defining sample size for each estimate, and also serving as the number of trial estimates.

Here is how I would do this (with minor style changes):

import numpy as np
f = np.sin
a = 0
b = np.pi

# number samples for each trial, trial count, and number calculated at once
N = 10000
TRIALS = 10000
BATCH_SIZE=1000

def mc_integrate(f, a, b, N, batch_size=BATCH_SIZE):
    # compute everything carrying `batch_size` copies by extending the array dimension.

    # samples.shape == (N, batch_size)
    samples = np.random.uniform(a, b, size=(N, batch_size))

    integrals = np.sum(f(samples), axis=0)
    mc_estimates = integrals * ((b - a) / N)    
    return mc_estimates


# loop over batch values to get final result
n, r = divmod(TRIALS, BATCH_SIZE)
results = []
for j in [BATCH_SIZE]*n + [r]:
    results.extend(mc_integrate(f, a, b, N, batch_size=j))

On my machine this takes a few seconds.

Upvotes: 1

2e0byo
2e0byo

Reputation: 5954

This turned out to be a more interesting problem than I thought it would when I got to optimising it. The basic method is very simple:

from multiprocessing import pool

def f(x):
    return x

results = pool.map(f, range(100))

Here is your mcIntegerate adapted for multiprocessing:

from tqdm import tqdm

def mcIntegrate(steps):
    tasks = []

    print("Setting up simulations")

    # linear
    for _ in tqdm(range(steps)):
        xrand = random.uniform(a, b, steps)
        for i in range(steps):
            tasks.append(xrand[i])

    pool = Pool(cpu_count())

    print("Simulating (no progress)")
    results = pool.map(f, tasks)
    pool.close()

    print("summing")
    areas = []
    for chunk in tqdm(range(steps)):
        vals = results[chunk * steps : (chunk + 1) * steps]
        integral = sum(vals)
        ans = integral * ((b - a) / float(steps))
        areas.append(ans)

    return areas

tqdm is just used to display a progress bar.

This is the basic workflow for multiprocessing: break the question up into tasks, solve all the tasks, then add them all back together again. And indeed the code as given works. (Note that I've changed your N for steps).

For completeness, the script now begins:

from scipy import random
import numpy as np
import matplotlib.pyplot as plt
from multiprocessing import Pool, cpu_count
from tqdm import tqdm

# function to integrate
def f(x):
    return np.sin(x)

and ends

areas = mcIntegrate(3_000)

a = 0
b = np.pi

plt.title("Distribution of Approximated Integrals")
plt.hist(areas, bins=30, ec="black")
plt.xlabel("Areas")
plt.show()

Optimisation

I deliberately split the problem up at the smallest possible level. Was this a good idea? To answer that, consider: how might we optimise the linear process of generating the tasks? This does take a considerable while at the moment. We could parallelise it:

def _prepare(steps):
    xrand = random.uniform(a, b, steps)
    return [xrand[i] for i in range(steps)]

def mcIntegrate(steps):
    ...
    tasks = []
    for res in tqdm(pool.imap(_prepare, (steps for _ in range(steps))), total=steps):
        tasks += res  # slower except for very large steps

Here I've used pool.imap, which returns an iterator which we can iterate as soon as the results are available, allowing us to build a progress bar. If you do this and compare, you will see that it runs slower than the linear solution. Removing the progress bar (on my machine) and replace with:

    import time

    start = time.perf_counter()
    results = pool.map(_prepare, (steps for _ in range(steps)))
    tasks = []
    for res in results:
        tasks += res
    print(time.perf_counter() - start)

Is only marginally faster: it's still slower than running linear. Serialising data to a process and then deserialising it has an overhead. If you try to get a progress bar on the whole thing, it becomes excruciatingly slow:

    results = []
    for result in tqdm(pool.imap(f, tasks), total=len(tasks)):
        results.append(result)

So what about iterating at a higher level? Here's another adaption of your mcIterate:

a = 0
b = np.pi

def _mcIntegrate(steps):
    xrand = random.uniform(a, b, steps)
    integral = 0.0
    for i in range(steps):
        integral += f(xrand[i])
    ans = integral * ((b - a) / float(steps))

    return ans


def mcIntegrate(steps):
    areas = []
    p = Pool(cpu_count())
    for ans in tqdm(p.imap(_mcIntegrate, ((steps) for _ in range(steps))), total=steps):
        areas.append(ans)

    return areas

This, on my machine, is considerably faster. It's also considerably simpler. I was expecting a difference, but not such a considerable difference.

Takeaways

Multiprocessing isn't free. Something as simple as np.sin() is too cheap to multprocess: we pay to serialise, deserialise, append, and so on, all for one sin() calculation. But if you do too many calculations, you will waste time as you lose granularity. Here the effect is more striking than I was expecting. The only way to know the right level of granularity for a particular problem... is to profile and try.

Upvotes: 4

Related Questions