pir
pir

Reputation: 5923

Speeding up normal distribution probability mass allocation

We have N users with P avg. points per user, where each point is a single value between 0 and 1. We need to distribute the mass of each point using a normal distribution with a known density of 0.05 as the points have some uncertainty. Additionally, we need to wrap the mass around 0 and 1 such that e.g. a point at 0.95 will also allocate mass around 0. I've provided a working example below, which bins the normal distribution into D=50 bins. The example uses the Python typing module, but you can ignore that if you'd like.

from typing import List, Any
import numpy as np
import scipy.stats
import matplotlib.pyplot as plt

D = 50
BINS: List[float] = np.linspace(0, 1, D + 1).tolist()


def probability_mass(distribution: Any, x0: float, x1: float) -> float:
    """
    Computes the area under the distribution, wrapping at 1.
    The wrapping is done by adding the PDF at +- 1.
    """
    assert x1 > x0
    return (
        (distribution.cdf(x1) - distribution.cdf(x0))
        + (distribution.cdf(x1 + 1) - distribution.cdf(x0 + 1))
        + (distribution.cdf(x1 - 1) - distribution.cdf(x0 - 1))
    )


def point_density(x: float) -> List[float]:
    distribution: Any = scipy.stats.norm(loc=x, scale=0.05)
    density: List[float] = []
    for i in range(D):
        density.append(probability_mass(distribution, BINS[i], BINS[i + 1]))
    return density


def user_density(points: List[float]) -> Any:

    # Find the density of each point
    density: Any = np.array([point_density(p) for p in points])

    # Combine points and normalize
    combined = density.sum(axis=0)
    return combined / combined.sum()


if __name__ == "__main__":

    # Example for one user
    data: List[float] = [.05, .3, .5, .5]
    density = user_density(data)

    # Example for multiple users (N = 2)
    print([user_density(x) for x in [[.3, .5], [.7, .7, .7, .9]]])

    ### NB: THE REMAINING CODE IS FOR ILLUSTRATION ONLY!
    ### NB: THE IMPORTANT THING IS TO COMPUTE THE DENSITY FAST!
    middle: List[float] = []
    for i in range(D):
        middle.append((BINS[i] + BINS[i + 1]) / 2)
    plt.bar(x=middle, height=density, width=1.0 / D + 0.001)
    plt.xlim(0, 1)
    plt.xlabel("x")
    plt.ylabel("Density")
    plt.show()

matplotlib output

In this example N=1, D=50, P=4. However, we want to scale this approach to N=10000 and P=100 while being as fast as possible. It's unclear to me how we'd vectorize this approach. How do we best speed up this?

EDIT

The faster solution can have slightly different results. For instance, it could approximate the normal distribution instead of using the precise normal distribution.

EDIT2

We only care about computing density using the user_density() function. The plot is only to help explain the approach. We do not care about the plot itself :)

EDIT3

Note that P is the avg. points per user. Some users may have more and some may have less. If it helps, you can assume that we can throw away points such that all users have a max of 2 * P points. It's fine to ignore this part while benchmarking as long as the solution can handle a flexible # of points per user.

Upvotes: 5

Views: 756

Answers (3)

tstanisl
tstanisl

Reputation: 14127

You could get below 50ms for largest case (N=10000, AVG[P]=100, D=50) by using using FFT and creating data in numpy friendly format. Otherwise it will be closer to 300 msec.

The idea is to convolve a single normal distribution centered at 0 with a series Dirac deltas.

See image below: enter image description here

Using circular convolution solves two issues.

  • naturally deals with wrapping at the edges
  • can be efficiently computed with FFT and Convolution Theorem

First one must create a distribution to be copied. Function mk_bell() created a histogram of a normal distribution of stddev 0.05 centered at 0. The distribution wraps around 1. One could use arbitrary distribution here. The spectrum of the distribution is computed are used for fast convolution.

Next a comb-like function is created. The peaks are placed at indices corresponding to peaks in user density. E.g.

peaks_location = [0.1, 0.3, 0.7]
D = 10

maps to

peak_index = (D * peak_location).astype(int) = [1, 3, 7]
dist = [0, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0] # ones at [1, 3, 7]

You can quickly create a composition of Diract Deltas by computing indices of the bins for each peak location with help of np.bincount() function. In order to speed things even more one can compute comb-functions for user-peaks in parallel.

Array dist is 2D-array of shape NxD. It can be linearized to 1D array of shape (N*D). After this change element on position [user_id, peak_index] will be accessible from index user_id*D + peak_index. With numpy-friendly input format (described below) this operation is easily vectorized.

The convolution theorem says that spectrum of convolution of two signals is equal to product of spectrums of each signal.

The spectrum is compute with numpy.fft.rfft which is a variant of Fast Fourier Transfrom dedicated to real-only signals (no imaginary part).

Numpy allows to compute FFT of each row of the larger matrix with one command.

Next, the spectrum of convolution is computed by simple multiplication and use of broadcasting.

Next, the spectrum is computed back to "time" domain by Inverse Fourier Transform implemented in numpy.fft.irfft.

To use the full speed of numpy one should avoid variable size data structure and keep to fixed size arrays. I propose to represent input data as three arrays.

  • uids the identifier for user, integer 0..N-1
  • peaks, the location of the peak
  • mass, the mass of the peek, currently it is 1/numer-of-peaks-for-user

This representation of data allows quick vectorized processing. Eg:

user_data = [[0.1, 0.3], [0.5]]

maps to:

uids = [0, 0, 1] # 2 points for user_data[0], one from user_data[1]
peaks = [0.1, 0.3, 0.5] # serialized user_data
mass = [0.5, 0.5, 1] # scaling factors for each peak, 0.5 means 2 peaks for user 0

The code:

import numpy as np
import matplotlib.pyplot as plt
import time

def mk_bell(D, SIGMA):
    # computes normal distribution wrapped and centered at zero
    x = np.linspace(0, 1, D, endpoint=False);
    x = (x + 0.5) % 1 - 0.5
    bell = np.exp(-0.5*np.square(x / SIGMA))
    return bell / bell.sum()

def user_densities_by_fft(uids, peaks, mass, D, N=None):
    bell = mk_bell(D, 0.05).astype('f4')
    sbell = np.fft.rfft(bell)
    if N is None:
        N = uids.max() + 1
    # ensure that peaks are in [0..1) internal
    peaks = peaks - np.floor(peaks)
    # convert peak location from 0-1 to the indices
    pidx = (D * (peaks + uids)).astype('i4')
    dist = np.bincount(pidx, mass, N * D).reshape(N, D)
    # process all users at once with Convolution Theorem
    sdist = np.fft.rfft(dist)
    sdist *= sbell
    res = np.fft.irfft(sdist)

    return res

def generate_data(N, Pmean):
    # generateor for large data
    data = []
    for n in range(N):
        # select P uniformly from 1..2*Pmean
        P = np.random.randint(2 * Pmean) + 1
        # select peak locations
        chunk = np.random.uniform(size=P)
        data.append(chunk.tolist())
    return data

def make_data_numpy_friendly(data):
    uids = []
    chunks = []
    mass = []
    for uid, peaks in enumerate(data):
        uids.append(np.full(len(peaks), uid))
        mass.append(np.full(len(peaks), 1 / len(peaks)))
        chunks.append(peaks)
    return np.hstack(uids), np.hstack(chunks), np.hstack(mass)




D = 50

# demo for simple multi-distribution
data, N = [[0, .5], [.7, .7, .7, .9], [0.05, 0.3, 0.5, 0.5]], None
uids, peaks, mass = make_data_numpy_friendly(data)
dist = user_densities_by_fft(uids, peaks, mass, D, N)
plt.plot(dist.T)
plt.show()

# the actual measurement
N = 10000
P = 100
data = generate_data(N, P)

tic = time.time()
uids, peaks, mass = make_data_numpy_friendly(data)
toc = time.time()
print(f"make_data_numpy_friendly: {toc - tic}")

tic = time.time()
dist = user_densities_by_fft(uids, peaks, mass, D, N)
toc = time.time()
print(f"user_densities_by_fft: {toc - tic}")

The results on my 4-core Haswell machine are:

make_data_numpy_friendly: 0.2733159065246582
user_densities_by_fft: 0.04064297676086426

It took 40ms to process the data. Notice that processing data to numpy friendly format takes 6 times more time than the actual computation of distributions. Python is really slow when it comes to looping. Therefore I strongly recommend to generate input data directly in numpy-friendly way in the first place.

There are some issues to be fixed:

  • precision, can be improved by using larger D and downsampling
  • accuracy of peak location could be further improved by widening the spikes.
  • performance, scipy.fft offers move variants of FFT implementation that may be faster

Upvotes: 5

Troy D
Troy D

Reputation: 2245

I was able to reduce the time from about 4 seconds per sample of 100 datapoints to about 1 ms per sample.

It looks to me like you're spending quite a lot of time simulating a very large number of normal distributions. Since you're dealing with a very large sample size anyway, you may as well just use standard normal distribution values, because it'll all just average out anyway.

I recreated your approach (BaseMethod class), then created an optimized class (OptimizedMethod class), and evaluated them using a timeit decorator. The primary difference in my approach is the following line:

    # Generate a standardized set of values to add to each sample to simulate normal distribution
    self.norm_vals = np.array([norm.ppf(x / norm_val_n) * 0.05 for x in range(1, norm_val_n, 1)])

This creates a generic set of datapoints based on an inverse normal cumulative distribution function that we can add to each datapoint to simulate a normal distribution around that point. Then we just reshape the data into user samples and run np.histogram on the samples.

import numpy as np
import scipy.stats
from scipy.stats import norm
import time

# timeit decorator for evaluating performance
def timeit(method):
    def timed(*args, **kw):
        ts = time.time()
        result = method(*args, **kw)
        te = time.time()
        print('%r  %2.2f ms' % (method.__name__, (te - ts) * 1000 ))
        return result
    return timed

# Define Variables
N = 10000
D = 50
P = 100

# Generate sample data
np.random.seed(0)
data = np.random.rand(N, P)

# Run OP's method for comparison
class BaseMethod:
    def __init__(self, d=50):
        self.d = d
        self.bins = np.linspace(0, 1, d + 1).tolist()

    def probability_mass(self, distribution, x0, x1):
        """
        Computes the area under the distribution, wrapping at 1.
        The wrapping is done by adding the PDF at +- 1.
        """
        assert x1 > x0
        return (
            (distribution.cdf(x1) - distribution.cdf(x0))
            + (distribution.cdf(x1 + 1) - distribution.cdf(x0 + 1))
            + (distribution.cdf(x1 - 1) - distribution.cdf(x0 - 1))
        )

    def point_density(self, x):
        distribution = scipy.stats.norm(loc=x, scale=0.05)
        density = []
        for i in range(self.d):
            density.append(self.probability_mass(distribution, self.bins[i], self.bins[i + 1]))
        return density

    @timeit
    def base_user_density(self, data):
        n = data.shape[0]
        density = np.empty((n, self.d))
        for i in range(data.shape[0]):
            # Find the density of each point
            row_density = np.array([self.point_density(p) for p in data[i]])
            # Combine points and normalize
            combined = row_density.sum(axis=0)
            density[i, :] = combined / combined.sum()
        return density


base = BaseMethod(d=D)
# Only running base method on first 2 rows of data because it's slow
density = base.base_user_density(data[:2])
print(density[:2, :5])


class OptimizedMethod:

    def __init__(self, d=50, norm_val_n=50):
        self.d = d
        self.norm_val_n = norm_val_n
        self.bins = np.linspace(0, 1, d + 1).tolist()

        # Generate a standardized set of values to add to each sample to simulate normal distribution
        self.norm_vals = np.array([norm.ppf(x / norm_val_n) * 0.05 for x in range(1, norm_val_n, 1)])

    @timeit
    def optimized_user_density(self, data):

        samples = np.empty((data.shape[0], data.shape[1], self.norm_val_n - 1))
        # transform datapoints to normal distributions around datapoint
        for i in range(self.norm_vals.shape[0]):
            samples[:, :, i] = data + self.norm_vals[i]
        samples = samples.reshape(samples.shape[0], -1)

        #wrap around [0, 1]
        samples = samples % 1

        #loop over samples for density
        density = np.empty((data.shape[0], self.d))
        for i in range(samples.shape[0]):
            hist, bins = np.histogram(samples[i], bins=self.bins)
            density[i, :] = hist / hist.sum()

        return density


om = OptimizedMethod()
#Run optimized method on first 2 rows for apples to apples comparison
density = om.optimized_user_density(data[:2])
#Run optimized method on full data
density = om.optimized_user_density(data)
print(density[:2, :5])

Running on my system, the original method took about 8.4 seconds to run on 2 rows of data, while the optimized method took 1 millisecond to run on 2 rows of data and completed 10,000 rows in 4.7 seconds. I printed the first five values of the first 2 samples for each method.

'base_user_density'  8415.03 ms
[[0.02176227 0.02278653 0.02422535 0.02597123 0.02745976]
 [0.0175103  0.01638513 0.01524853 0.01432158 0.01391156]]
'optimized_user_density'  1.09 ms
'optimized_user_density'  4755.49 ms
[[0.02142857 0.02244898 0.02530612 0.02612245 0.0277551 ]
 [0.01673469 0.01653061 0.01510204 0.01428571 0.01326531]]

Upvotes: 4

Quang Hoang
Quang Hoang

Reputation: 150735

This would be my vectorized approach:

data = np.array([0.05, 0.3, 0.5, 0.5])

np.random.seed(31415)
# random noise
randoms = np.random.normal(0,1,(len(data), int(1e5))) * 0.05

# samples with noise
samples = data[:,None] + randoms

# wrap [0,1]
samples = (samples % 1).ravel()


# histogram
hist, bins, patches = plt.hist(samples, bins=BINS, density=True)

Output:

enter image description here

Upvotes: 4

Related Questions