Prokhozhii
Prokhozhii

Reputation: 702

Arnaud Legoux Moving Average (ALMA) in NumPy

I'd like to write the vectored version of code that calculates Arnaud Legoux Moving Average (ALMA) using NumPy (or Pandas). Could you help me with this, please? Thanks.

Non-vectored version looks like following (see below).

def NPALMA(pnp_array, **kwargs) :
    '''
    ALMA - Arnaud Legoux Moving Average,
    http://www.financial-hacker.com/trend-delusion-or-reality/
    https://github.com/darwinsys/Trading_Strategies/blob/master/ML/Features.py
    '''
    length = kwargs['length']
    # just some number (6.0 is useful)
    sigma = kwargs['sigma']
    # sensisitivity (close to 1) or smoothness (close to 0)
    offset = kwargs['offset']

    asize = length - 1
    m = offset * asize
    s = length  / sigma
    dss = 2 * s * s
    
    alma = np.zeros(pnp_array.shape)
    wtd_sum = np.zeros(pnp_array.shape)

    for l in range(len(pnp_array)):
        if l >= asize:
            for i in range(length):
                im = i - m
                wtd = np.exp( -(im * im) / dss)
                alma[l] += pnp_array[l - length + i] * wtd
                wtd_sum[l] += wtd
            alma[l] = alma[l] / wtd_sum[l]

    return alma

Upvotes: 5

Views: 3157

Answers (2)

Asclepius
Asclepius

Reputation: 63312

None of the variations in the previous answer by Divakar quite matched the numbers produced by TradingView's ALMA v26.0. It is essential that the numbers match a reference. With this goal in mind, the Pine Script v5 implementation of ta.alma serves as a reference even though it is not vectorized.

Here is a NumPy version which produces the same outputs as TradingView:

import numpy as np

def alma_np(prices: np.ndarray, window: int = 9, sigma: float = 6, offset: float = 0.85) -> np.ndarray:
    # Ref: https://stackoverflow.com/a/76031340/
    m = offset * (window - 1)
    s = window / sigma
    i = np.arange(window)
    weights = np.exp(-1 * np.square(i - m) / (2 * np.square(s)))
    norm_weights = weights / np.sum(weights)
    padded_prices = np.pad(prices, (window - 1, 0), mode='edge')
    alma_values = np.convolve(padded_prices, norm_weights[::-1], mode='valid')
    return alma_values

Here is a Pandas version which uses the above NumPy version:

def alma_pd(prices: pd.Series, window: int = 9, *, sigma: float = 6, offset: float = 0.85) -> pd.Series:
    # Ref: https://stackoverflow.com/a/76031340/
    prices_series = prices
    prices = prices.to_numpy()
    alma_values = alma_np(prices, window, sigma=sigma, offset=offset)
    alma_series = pd.Series(alma_values, index=prices_series.index)
    return alma_series

The values were cross-checked over 5min data for $SPY for 2023-04-14, matching to the fifth decimal place.

Upvotes: 1

Divakar
Divakar

Reputation: 221574

Starting Approach

We can create sliding windows along the first axis and then use tensor multiplication with the range of wtd values for the sum-reductions.

The implementation would look something like this -

# Get all wtd values in an array
wtds = np.exp(-(np.arange(length) - m)**2/dss)

# Get the sliding windows for input array along first axis
pnp_array3D = strided_axis0(pnp_array,len(wtds))

# Initialize o/p array
out = np.zeros(pnp_array.shape)

# Get sum-reductions for the windows which don't need wrapping over
out[length:] = np.tensordot(pnp_array3D,wtds,axes=((1),(0)))[:-1]

# Last element of the output needed wrapping. So, do it separately.
out[length-1] = wtds.dot(pnp_array[np.r_[-1,range(length-1)]])

# Finally perform the divisions
out /= wtds.sum()

Function to get the sliding windows : strided_axis0 is from here.

Boost with 1D convolution

Those multiplications with wtds values and then their sum-reductions are basically convolution along the first axis. As such, we can use scipy.ndimage.convolve1d along axis=0. This would be much faster given the memory efficiency, as we won't be creating huge sliding windows.

The implementation would be -

from scipy.ndimage import convolve1d as conv

avgs = conv(pnp_array, weights=wtds/wtds.sum(),axis=0, mode='wrap')

Thus, out[length-1:], which are the non-zero rows would be same as avgs[:-length+1].

There could be some precision difference if we are working with really small kernel numbers from wtds. So, keep that in mind if using this convolution method.

Runtime test

Approaches -

def original_app(pnp_array, length, m, dss):
    alma = np.zeros(pnp_array.shape)
    wtd_sum = np.zeros(pnp_array.shape)

    for l in range(len(pnp_array)):
        if l >= asize:
            for i in range(length):
                im = i - m
                wtd = np.exp( -(im * im) / dss)
                alma[l] += pnp_array[l - length + i] * wtd
                wtd_sum[l] += wtd
            alma[l] = alma[l] / wtd_sum[l]
    return alma

def vectorized_app1(pnp_array, length, m, dss):
    wtds = np.exp(-(np.arange(length) - m)**2/dss)
    pnp_array3D = strided_axis0(pnp_array,len(wtds))
    out = np.zeros(pnp_array.shape)
    out[length:] = np.tensordot(pnp_array3D,wtds,axes=((1),(0)))[:-1]
    out[length-1] = wtds.dot(pnp_array[np.r_[-1,range(length-1)]])
    out /= wtds.sum()
    return out

def vectorized_app2(pnp_array, length, m, dss):
    wtds = np.exp(-(np.arange(length) - m)**2/dss)
    return conv(pnp_array, weights=wtds/wtds.sum(),axis=0, mode='wrap')

Timings -

In [470]: np.random.seed(0)
     ...: m,n = 1000,100
     ...: pnp_array = np.random.rand(m,n)
     ...: 
     ...: length = 6
     ...: sigma = 0.3
     ...: offset = 0.5
     ...: 
     ...: asize = length - 1
     ...: m = np.floor(offset * asize)
     ...: s = length  / sigma
     ...: dss = 2 * s * s
     ...: 

In [471]: %timeit original_app(pnp_array, length, m, dss)
     ...: %timeit vectorized_app1(pnp_array, length, m, dss)
     ...: %timeit vectorized_app2(pnp_array, length, m, dss)
     ...: 
10 loops, best of 3: 36.1 ms per loop
1000 loops, best of 3: 1.84 ms per loop
1000 loops, best of 3: 684 µs per loop

In [472]: np.random.seed(0)
     ...: m,n = 10000,1000 # rest same as previous one

In [473]: %timeit original_app(pnp_array, length, m, dss)
     ...: %timeit vectorized_app1(pnp_array, length, m, dss)
     ...: %timeit vectorized_app2(pnp_array, length, m, dss)
     ...: 
1 loop, best of 3: 503 ms per loop
1 loop, best of 3: 222 ms per loop
10 loops, best of 3: 106 ms per loop

Upvotes: 4

Related Questions