Reputation: 61
I want to apply a weighted rolling average to a large timeseries, set up as a pandas dataframe, where the weights are different for each day. Here's a subset of the dataframe
DF:
Date v_std vertical
2010-10-01 1.909 545.231
2010-10-02 1.890 538.610
2010-10-03 1.887 542.759
2010-10-04 1.942 545.221
2010-10-05 1.847 536.832
2010-10-06 1.884 538.858
2010-10-07 1.864 538.017
2010-10-08 1.833 540.737
2010-10-09 1.847 537.906
2010-10-10 1.881 538.210
2010-10-11 1.868 544.238
2010-10-12 1.856 534.878
I want to take a rolling average of the vertical column using the v_std as the weights. I've been using the weighted average function:
def wavg(group, avg_name, weight_name):
d = group[avg_name]
w = group[weight_name]
try:
return (d * w).sum() / w.sum()
except ZeroDivisionError:
return d.mean()
But I can't figure out how to implement this for a rolling weighted average. I assume it is similar to
df.rolling(window = 7).apply(wavg, "vertical", "v_std")
or utilizing rolling_apply? Or will I have to write a new function all together? Thank you!
Upvotes: 6
Views: 18109
Reputation: 141
Here is a solution using numpy_ext.rolling_apply():
import pandas as pd
import numpy as np
import numpy_ext as npe
import time
df = pd.DataFrame(columns=["Date", "v_std", "vertical"],
data = ((pd.to_datetime(x[:10]), float(x[10:17]), float(x[17:]))
for x in
"""2010-10-01 1.909 545.231
2010-10-02 1.890 538.610
2010-10-03 1.887 542.759
2010-10-04 1.942 545.221
2010-10-05 1.847 536.832
2010-10-06 1.884 538.858
2010-10-07 1.864 538.017
2010-10-08 1.833 540.737
2010-10-09 1.847 537.906
2010-10-10 1.881 538.210
2010-10-11 1.868 544.238
2010-10-12 1.856 534.878""".split("\n"))).set_index("Date")
window = 7
# === Method 1 (less efficient)
def wavg1(vertical, v_std):
return np.dot(v_std, vertical) / np.sum(v_std)
t = time.time()
df["wavg1"] = npe.rolling_apply(wavg1, window, df["vertical"], df["v_std"])
print(f"Method 1: {time.time()-t:6f} seconds")
# === Method 2 (more efficient but requires creating extra column)
def wavg2(v_std_vertical, v_std):
return v_std_vertical.sum() / np.sum(v_std)
df["v_std_vertical"] = df["v_std"].mul(df["vertical"])
t = time.time()
df["wavg2"] = npe.rolling_apply(wavg2, window, df["v_std_vertical"], df["v_std"])
print(f"Method 2: {time.time()-t:6f} seconds (not counting adding new column)")
print("\n", df)
Output:
Upvotes: 0
Reputation: 1
Based on orherman answer I created the following class that should be easier to use and has a similar API to Dataframe.rolling() :
from pandas.core.window.rolling import RollingAndExpandingMixin
class RollingWeightedAverageDataFrame:
def __init__(self, df):
self.df = df
self.col_names = list(df.columns)
assert len(self.col_names) == 2,"Unexpected input, dataframe should have 2 columns"
def rolling(self, window, min_periods):
self.window = window
self.min_periods = min_periods
return self
def weighted_average(self):
self.df['mul'] = self.df[self.col_names[0]] * self.df[self.col_names[1]]
def _weighted_average(x):
return (x['mul'].sum() / x[self.col_names[1]].sum())
RollingAndExpandingMixin.weighted_average = _weighted_average
return self.df[[self.col_names[0], self.col_names[1], 'mul']].rolling(window=self.window, min_periods=self.min_periods).weighted_average()
Suppose in your code you have a dataframe with columns 'value' and 'weight', and you want a window of 7 and a minimum of 5 periods, just add the following:
df['wavg'] = RollingWeightedAverageDataFrame(df[['value','weight']])
.rolling(window=7, min_periods=5)
.weighted_average()
Upvotes: 0
Reputation: 49
Here is my solution for rolling weighted average, using pandas _Rolling_and_Expanding
:
First, I've added new column for the multiplication:
df['mul'] = df['value'] * df['weight']
Then write the function you would like to apply:
from pandas.core.window.rolling import _Rolling_and_Expanding
def weighted_average(x):
d = []
d.append(x['mul'].sum()/x['weight'].sum())
return pd.Series(d, index=['wavg'])
_Rolling_and_Expanding.weighted_average = weighted_average
Apply the function by the following line:
result = mean_per_group.rolling(window=7).weighted_average()
Then you can get the series you wanted by:
result['wavg']
Upvotes: 4
Reputation: 1
The following code should do (pardon my long naming conventions). It is quite simple (just to take advantage of new version of Pandas's rolling.apply which added raw=False to allow passing more information than a 1d array):
def get_weighted_average(dataframe,window,columnname_data,columnname_weights):
processed_dataframe=dataframe.loc[:,(columnname_data,columnname_weights)].set_index(columnname_weights)
def get_mean_withweights(processed_dataframe_windowed):
return np.average(a=processed_dataframe_windowed,weights=processed_dataframe_windowed.index)
return processed_dataframe.rolling(window=window).apply(func=get_mean_withweights,raw=False)
Upvotes: 0
Reputation: 71
This is how I implemented weighted mean. Would be nice if there was a pairwise_apply for this sort of thing.
from pandas.core.window import _flex_binary_moment, _Rolling_and_Expanding
def weighted_mean(self, weights, **kwargs):
weights = self._shallow_copy(weights)
window = self._get_window(weights)
def _get_weighted_mean(X, Y):
X = X.astype('float64')
Y = Y.astype('float64')
sum_f = lambda x: x.rolling(window, self.min_periods, center=self.center).sum(**kwargs)
return sum_f(X * Y) / sum_f(Y)
return _flex_binary_moment(self._selected_obj, weights._selected_obj,
_get_weighted_mean, pairwise=True)
_Rolling_and_Expanding.weighted_mean = weighted_mean
df['mean'] = df['vertical'].rolling(window = 7).weighted_mean(df['v_std'])
Upvotes: 3
Reputation: 9
I believe you may be looking for win_type parameter of rolling(). You can specify different types of windows, like 'triang' (triangular) ...
You may have a look at the parameter at https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.rolling.html
Upvotes: -1