Reputation: 702
I'd like to write the vectored version of code that calculates Arnaud Legoux Moving Average (ALMA) using NumPy (or Pandas). Could you help me with this, please? Thanks.
Non-vectored version looks like following (see below).
def NPALMA(pnp_array, **kwargs) :
'''
ALMA - Arnaud Legoux Moving Average,
http://www.financial-hacker.com/trend-delusion-or-reality/
https://github.com/darwinsys/Trading_Strategies/blob/master/ML/Features.py
'''
length = kwargs['length']
# just some number (6.0 is useful)
sigma = kwargs['sigma']
# sensisitivity (close to 1) or smoothness (close to 0)
offset = kwargs['offset']
asize = length - 1
m = offset * asize
s = length / sigma
dss = 2 * s * s
alma = np.zeros(pnp_array.shape)
wtd_sum = np.zeros(pnp_array.shape)
for l in range(len(pnp_array)):
if l >= asize:
for i in range(length):
im = i - m
wtd = np.exp( -(im * im) / dss)
alma[l] += pnp_array[l - length + i] * wtd
wtd_sum[l] += wtd
alma[l] = alma[l] / wtd_sum[l]
return alma
Upvotes: 5
Views: 3157
Reputation: 63312
None of the variations in the previous answer by Divakar quite matched the numbers produced by TradingView's ALMA v26.0. It is essential that the numbers match a reference. With this goal in mind, the Pine Script v5 implementation of ta.alma
serves as a reference even though it is not vectorized.
Here is a NumPy version which produces the same outputs as TradingView:
import numpy as np
def alma_np(prices: np.ndarray, window: int = 9, sigma: float = 6, offset: float = 0.85) -> np.ndarray:
# Ref: https://stackoverflow.com/a/76031340/
m = offset * (window - 1)
s = window / sigma
i = np.arange(window)
weights = np.exp(-1 * np.square(i - m) / (2 * np.square(s)))
norm_weights = weights / np.sum(weights)
padded_prices = np.pad(prices, (window - 1, 0), mode='edge')
alma_values = np.convolve(padded_prices, norm_weights[::-1], mode='valid')
return alma_values
Here is a Pandas version which uses the above NumPy version:
def alma_pd(prices: pd.Series, window: int = 9, *, sigma: float = 6, offset: float = 0.85) -> pd.Series:
# Ref: https://stackoverflow.com/a/76031340/
prices_series = prices
prices = prices.to_numpy()
alma_values = alma_np(prices, window, sigma=sigma, offset=offset)
alma_series = pd.Series(alma_values, index=prices_series.index)
return alma_series
The values were cross-checked over 5min data for $SPY for 2023-04-14, matching to the fifth decimal place.
Upvotes: 1
Reputation: 221574
Starting Approach
We can create sliding windows along the first axis and then use tensor multiplication with the range of wtd
values for the sum-reductions.
The implementation would look something like this -
# Get all wtd values in an array
wtds = np.exp(-(np.arange(length) - m)**2/dss)
# Get the sliding windows for input array along first axis
pnp_array3D = strided_axis0(pnp_array,len(wtds))
# Initialize o/p array
out = np.zeros(pnp_array.shape)
# Get sum-reductions for the windows which don't need wrapping over
out[length:] = np.tensordot(pnp_array3D,wtds,axes=((1),(0)))[:-1]
# Last element of the output needed wrapping. So, do it separately.
out[length-1] = wtds.dot(pnp_array[np.r_[-1,range(length-1)]])
# Finally perform the divisions
out /= wtds.sum()
Function to get the sliding windows : strided_axis0
is from here
.
Boost with 1D
convolution
Those multiplications with wtds
values and then their sum-reductions are basically convolution along the first axis. As such, we can use scipy.ndimage.convolve1d
along axis=0
. This would be much faster given the memory efficiency, as we won't be creating huge sliding windows.
The implementation would be -
from scipy.ndimage import convolve1d as conv
avgs = conv(pnp_array, weights=wtds/wtds.sum(),axis=0, mode='wrap')
Thus, out[length-1:]
, which are the non-zero rows would be same as avgs[:-length+1]
.
There could be some precision difference if we are working with really small kernel numbers from wtds
. So, keep that in mind if using this convolution
method.
Runtime test
Approaches -
def original_app(pnp_array, length, m, dss):
alma = np.zeros(pnp_array.shape)
wtd_sum = np.zeros(pnp_array.shape)
for l in range(len(pnp_array)):
if l >= asize:
for i in range(length):
im = i - m
wtd = np.exp( -(im * im) / dss)
alma[l] += pnp_array[l - length + i] * wtd
wtd_sum[l] += wtd
alma[l] = alma[l] / wtd_sum[l]
return alma
def vectorized_app1(pnp_array, length, m, dss):
wtds = np.exp(-(np.arange(length) - m)**2/dss)
pnp_array3D = strided_axis0(pnp_array,len(wtds))
out = np.zeros(pnp_array.shape)
out[length:] = np.tensordot(pnp_array3D,wtds,axes=((1),(0)))[:-1]
out[length-1] = wtds.dot(pnp_array[np.r_[-1,range(length-1)]])
out /= wtds.sum()
return out
def vectorized_app2(pnp_array, length, m, dss):
wtds = np.exp(-(np.arange(length) - m)**2/dss)
return conv(pnp_array, weights=wtds/wtds.sum(),axis=0, mode='wrap')
Timings -
In [470]: np.random.seed(0)
...: m,n = 1000,100
...: pnp_array = np.random.rand(m,n)
...:
...: length = 6
...: sigma = 0.3
...: offset = 0.5
...:
...: asize = length - 1
...: m = np.floor(offset * asize)
...: s = length / sigma
...: dss = 2 * s * s
...:
In [471]: %timeit original_app(pnp_array, length, m, dss)
...: %timeit vectorized_app1(pnp_array, length, m, dss)
...: %timeit vectorized_app2(pnp_array, length, m, dss)
...:
10 loops, best of 3: 36.1 ms per loop
1000 loops, best of 3: 1.84 ms per loop
1000 loops, best of 3: 684 µs per loop
In [472]: np.random.seed(0)
...: m,n = 10000,1000 # rest same as previous one
In [473]: %timeit original_app(pnp_array, length, m, dss)
...: %timeit vectorized_app1(pnp_array, length, m, dss)
...: %timeit vectorized_app2(pnp_array, length, m, dss)
...:
1 loop, best of 3: 503 ms per loop
1 loop, best of 3: 222 ms per loop
10 loops, best of 3: 106 ms per loop
Upvotes: 4