Wouter
Wouter

Reputation: 3261

Speeding up random number generation by parallelizing

I need to create many large numpy arrays (4e6, 100) with random numbers from a standard normal distribution, which I'm trying to speed up. I tried to generate different parts of the arrays using multiple cores but I'm not getting the expected speed improvements. Is there something I'm doing wrong, or am I wrong to expect speed improvements in this way?

from numpy.random import default_rng
from multiprocessing import Pool
from time import time


def rng_mp(rng):
    return rng.standard_normal((250000, 100))


if __name__ == '__main__':

    n_proc = 4
    rngs = [default_rng(n) for n in range(n_proc)]
    rng_all = default_rng(1)

    start = time()
    result = rng_all.standard_normal((int(1e6), 100))
    print(f'Single process: {time() - start:.3f} seconds')

    start = time()
    with Pool(processes=n_proc) as p:
        result = p.map_async(rng_mp, rngs).get()
    print(f'MP: {time() - start:.3f} seconds')

    # Single process: 1.114 seconds
    # MP: 2.634 seconds

Upvotes: 0

Views: 798

Answers (4)

Wouter
Wouter

Reputation: 3261

The logic from my other answer is now implemented in a package mtalg designed for generating random numbers using multithreading.

from mtalg.random import MultithreadedRNG
mrng = MultithreadedRNG(seed=1, num_threads=4)
mrng.standard_normal(size=(int(1e6), 100))

Upvotes: 0

Wouter
Wouter

Reputation: 3261

I'm obliged to the other contributors for coming up with this, but I found a way which is faster than the other suggestions, as it uses the filling of an existing array instead of creating new ones. It's an adaptation of the numpy documentation here, optimized for 2d arrays.

from numpy.random import default_rng, SeedSequence
import multiprocessing
import concurrent.futures
import numpy as np
from time import time


class MultithreadedRNG2D:
    def __init__(self, shape, seed=None, threads=None):
        if threads is None:
            threads = multiprocessing.cpu_count()
        self.threads = threads

        seq = SeedSequence(seed)
        self._random_generators = [default_rng(s)
                                   for s in seq.spawn(threads)]

        self.shape = shape
        self.executor = concurrent.futures.ThreadPoolExecutor(threads)
        self.values = np.empty(shape)
        self.steps = [(t * (shape[0] // threads), (t + 1) * (shape[0] // threads))
                      if t < (threads - 1)
                      else (t * (shape[0] // threads), shape[0])
                      for t in range(threads)]

    def fill(self):
        def _fill(random_state, out, firstrow, lastrow):
            random_state.standard_normal(out=out[firstrow:lastrow])

        futures = {}
        for i in range(self.threads):
            args = (_fill,
                    self._random_generators[i],
                    self.values,
                    self.steps[i][0],
                    self.steps[i][1])
            futures[self.executor.submit(*args)] = i
        concurrent.futures.wait(futures)

    def __del__(self):
        self.executor.shutdown(False)


mrng = MultithreadedRNG2D((int(1e6), 100), seed=1, threads=4)
start = time()
mrng.fill()
print(f'MT: {time() - start:.3f} seconds')

# MT: 0.336 seconds

Upvotes: 0

Booboo
Booboo

Reputation: 44043

I suspected the slowdown results simply from the fact that you need to be moving lots of data from the address spaces of the subprocesses back to the main process. I also suspected that the C-language implementation numpy used for random number generation releases the Global Interpreter Lock and that using multithreading instead of multiprocessing would solve your performance problem:

from numpy.random import default_rng
from multiprocessing.pool import ThreadPool
from time import time


def rng_mp(rng):
    return rng.standard_normal((250000, 100))


if __name__ == '__main__':

    n_proc = 4
    rngs = [default_rng(n) for n in range(n_proc)]
    rng_all = default_rng(1)

    start = time()
    result = rng_all.standard_normal((int(1e6), 100))
    print(f'Single process: {time() - start:.3f} seconds')

    start = time()
    with ThreadPool(processes=n_proc) as p:
        result = p.map_async(rng_mp, rngs).get()
    print(f'MT: {time() - start:.3f} seconds')

Prints:

Single process: 1.210 seconds
MT: 0.413 seconds

Upvotes: 2

user2668284
user2668284

Reputation:

This is not meant as an answer to the original question - more of a follow-up that begs more questions than I can answer.

I have rearranged the code to try to see what's really going on here.

from numpy.random import default_rng
from concurrent.futures import ProcessPoolExecutor
import time

NPROC = 4

def rng_mp(i):
    s = time.perf_counter()
    r = default_rng(i).standard_normal((250000, 100))
    e = time.perf_counter()
    print(f'Process {i} {e-s:.2f}s')
    return r


if __name__ == '__main__':
    start = time.perf_counter()
    with ProcessPoolExecutor() as executor:
        for fr in [executor.submit(rng_mp, i) for i in range(NPROC)]:
            s = time.perf_counter()
            fr.result()
            e = time.perf_counter()
            print(f'Result time {e-s:.2f}')
    end = time.perf_counter()
    print(f'Overall {end - start:.3f} seconds')

A typical output from this is as follows:

Process 0 0.33s
Process 2 0.33s
Process 1 0.33s
Process 3 0.33s
Result time 2.27
Result time 5.57
Result time 0.00
Result time 0.00
Overall 7.999 seconds

In other words, the ring_mp() process executes in good time. BUT the delay appears to be in acquiring the result which I can only guess is something to do with moving large amounts of memory between the sub- and main processes. FWIW I'm running numpy 1.21.4 with Python 3.9.8 on macOS 12.0.1. I cannot explain this.

UPDATE: Based on the answer from @Booboo I changed to using the ThreadPoolExecutor (no other changes needed) with the following results:

Process 3 0.34s
Process 1 0.35s
Process 0 0.35s
Result time 0.35
Result time 0.00
Process 2 0.35s
Result time 0.00
Result time 0.00
Overall 0.388 seconds

Upvotes: 1

Related Questions