Reputation: 1622
I have created a pandas dataframe as follows:
import pandas as pd
import numpy as np
ds = { 'trend' : [1,1,1,1,2,2,3,3,3,3,3,3,4,4,4,4,4], 'price' : [23,43,56,21,43,55,54,32,9,12,11,12,23,3,2,1,1]}
df = pd.DataFrame(data=ds)
The dataframe looks as follows:
display(df)
trend price
0 1 23
1 1 43
2 1 56
3 1 21
4 2 43
5 2 55
6 3 54
7 3 32
8 3 9
9 3 12
10 3 11
11 3 12
12 4 23
13 4 3
14 4 2
15 4 1
16 4 1
I have saved the dataframe to a .csv file called df.csv
:
df.to_csv("df.csv", index = False)
I have then created a function that calculates the Relative Strength Index (RSI - see: https://www.investopedia.com/terms/r/rsi.asp):
def get_RSI(df, column, time_window):
"""Return the RSI indicator for the specified time window."""
diff = df[column].diff(1)
# This preservers dimensions off diff values.
up_chg = 0 * diff
down_chg = 0 * diff
# Up change is equal to the positive difference, otherwise equal to zero.
up_chg[diff > 0] = diff[diff > 0]
# Down change is equal to negative deifference, otherwise equal to zero.
down_chg[diff < 0] = diff[diff < 0]
# We set com = time_window-1 so we get decay alpha=1/time_window.
up_chg_avg = up_chg.ewm(com=time_window - 1,
min_periods=time_window).mean()
down_chg_avg = down_chg.ewm(com=time_window - 1,
min_periods=time_window).mean()
RS = abs(up_chg_avg / down_chg_avg)
df['RSI'] = 100 - 100 / (1 + RS)
df = df[['RSI']]
return df
I need to create a new field called RSI
which:
price
observed at each iteration and the last
prices (RSI length is 3 in this example) observed in the previous trends.For example:
I have then written this code:
rsi = []
for i in range(len(df)):
ds = pd.read_csv("df.csv", nrows=i+1)
print(ds.info())
d = ds.groupby(['trend'], as_index=False).agg(
{'price':'last'})
get_RSI(d,'price',3)
rsi.append(d['RSI'].iloc[-1])
df['RSI'] = rsi
The dataset looks correct:
display(df)
trend price RSI
0 1 23 NaN
1 1 43 NaN
2 1 56 NaN
3 1 21 NaN
4 2 43 NaN
5 2 55 NaN
6 3 54 NaN
7 3 32 NaN
8 3 9 NaN
9 3 12 NaN
10 3 11 NaN
11 3 12 NaN
12 4 23 47.667343
13 4 3 28.631579
14 4 2 28.099174
15 4 1 27.586207
16 4 1 27.586207
Th problem is that I need to process about 4 million records and it would take approximately 60 hours.
Does anyone know how to get the same results in a quick, efficient way, please?
Upvotes: 1
Views: 348
Reputation: 4823
Instead of reading the file on each line in a loop, it is better to do this:
ds = df.loc[:i]
Update 27.07.2024 Partially used what dydev did with range_group. Removing the call on each line. Using vector operations. As a result, 4 million lines are counted in two minutes. This will work if the trends are ordered. 1, 2, 3, etc., but not 1, 2, 3, 1 for example. Moved the previous version down.
Dataframe generation
n = 15
trend = np.random.randint(low=1, high=7, size=(n))
price = np.random.uniform(low=1, high=100, size=(n))
df = pd.DataFrame({'trend': trend, 'price': price})
df = df.sort_values(by=['trend']).reset_index(drop=True)
The calculation code itself:
time_window = 3
trends = df["trend"].unique()
arr = df['price'].values
range_group = np.stack(
[df[df["trend"] == trend].index.values.take([0, -1]) for trend in trends]
)
price = np.full((len(df), trends.size), np.nan)
prev = arr[range_group[:time_window, 1]]
for i in range(time_window, len(trends)):
stop = range_group[i, 1] + 1
price[range_group[i, 0]:stop, -1] = arr[range_group[i, 0]:stop]
price[range_group[i, 0]:stop, -(prev.size+1):-1] = prev
prev = price[range_group[i, 1], -(prev.size+1):]
price = price[range_group[time_window, 0]:]
diff = np.diff(price, axis=1)
up_chg = 0 * diff
down_chg = 0 * diff
up_chg[diff > 0] = diff[diff > 0]
down_chg[diff < 0] = diff[diff < 0]
up_chg_avg = pd.DataFrame(up_chg).T.ewm(
com=time_window - 1, min_periods=time_window).mean().iloc[-1]
down_chg_avg = pd.DataFrame(down_chg).T.ewm(
com=time_window - 1, min_periods=time_window).mean().iloc[-1]
RS = np.abs(up_chg_avg / down_chg_avg)
df.loc[range_group[time_window, 0]:, 'newRSI'] = (100 - 100 / (1 + RS)).values
Previous version.
The ind
column was created to receive indexes
in map
.
An empty array arr
is generated, which is filled on each line in func_numpy
. The rest is your operations, which are converted to vector ones, which are excluded from being processed row by row and executed in one go.
I also checked that the calculated RSI values coincide with your algorithm.
df['ind'] = df.index
size_trend = df['trend'].unique().size
arr = np.full((len(df), size_trend), np.nan)
def func_numpy(x):
row_last = df.loc[:x].groupby('trend')['price'].last().values
arr[x, -row_last.size:] = row_last
df['ind'].map(func_numpy)
diff = np.diff(arr, axis=1)
up_chg = 0 * diff
down_chg = 0 * diff
up_chg[diff > 0] = diff[diff > 0]
down_chg[diff < 0] = diff[diff < 0]
up_chg_avg = pd.DataFrame(up_chg).T.ewm(
com=time_window - 1, min_periods=time_window).mean().iloc[-1].values
down_chg_avg = pd.DataFrame(down_chg).T.ewm(
com=time_window - 1, min_periods=time_window).mean().iloc[-1].values
RS = np.abs(up_chg_avg / down_chg_avg)
df['newRSI'] = 100 - 100 / (1 + RS)
Upvotes: 2
Reputation: 481
It wasn't completely clear in the question what the analysis was trying to achieve - there is a high likelihood that there's a more efficient method if this was understood.
Using the below approach I managed to get the time down from 5 hours to 35 minutes on the simulated dataset, whilst still using the same methodology.
I simulated your 4m dataset as follows:-
import random
from tqdm import tqdm
random.seed(1337)
trends = [x for x in range(1000)]
ds = {
# sorted as I assume the 'trend' is the time indicator.
"trend": sorted(
[trends[int((random.random() * 1000))] for _ in tqdm(range(int(4e6)))]
),
"price": [random.random() * 100 for _ in tqdm(range(int(4e6)))],
}
df = pd.DataFrame(data=ds)
df.to_csv(
"fake_data.csv", index=True
) # reccommend keeping index to keep track of the time series
Before iterating over the df, I created a list of cartesian products (list of lists), where each cartesian product is the slice of the original df for which you are calculating the RSI. We can then iterate over this list to retrieve each required df slice.
import pandas as pd
from copy import deepcopy
from tqdm import tqdm
df = pd.read_csv("./fake_data.csv")
time_window = 3
def get_trend_ranges(df: pd.DataFrame, trends: list) -> dict[int, tuple]:
"""
trends - list of unique trend values
returns - start and end index of each range (assume df sorted by range)
"""
trends_ranges = {}
for trend in trends:
df_trend = df[df["trend"] == trend]
start_idx = df_trend.iloc[0].name
end_idx = df_trend.iloc[-1].name
trends_ranges[trend] = (start_idx, end_idx)
return trends_ranges
def _get_idx_slices(trends: list, trends_ranges: dict, time_window) -> list[list]:
"""
trends - list of unique trend values
trends_ranges - {trend: (start_index, end idx)}
returns [[idx1, idx2, ..,idxt+1], [idx11, idx22, ..., idx2t+1],..] where t is time_window
"""
if time_window >= len(trends) or time_window <= 0:
raise Exception("Time Window not within bounds")
else:
unique_trend_analyses = len(trends) - time_window
idx_lst = []
for i in tqdm(range(unique_trend_analyses)):
fixed_t = trends[i : i + time_window]
fixed_idx = [trends_ranges[trend][1] for trend in fixed_t]
var_t = trends[i + time_window]
var_t_start_idx, var_t_stop_idx = (
trends_ranges[var_t][0],
trends_ranges[var_t][1] + 1,
)
for j in range(var_t_start_idx, var_t_stop_idx):
full_idx = deepcopy(fixed_idx)
full_idx.append(j)
idx_lst.append(full_idx)
return idx_lst
# in order to store ranges of each trend, we assume trend is sorted.
def get_idx_slices(df, time_window):
trends = df["trend"].unique().tolist()
trends_ranges = get_trend_ranges(df, trends)
idx_slices = _get_idx_slices(trends, trends_ranges, time_window)
return idx_slices
Now we can iterate over the idx list to use the df slices that we require.
def get_RSI(df, time_window):
"""Return the RSI indicator for the specified time window."""
up_chg = df.apply(lambda x: x if x > 0 else 0)
down_chg = df.apply(lambda x: x if x < 0 else 0)
# We set com = time_window-1 so we get decay alpha=1/time_window.
up_chg_avg = up_chg.ewm(com=time_window - 1, min_periods=time_window).mean()
down_chg_avg = down_chg.ewm(com=time_window - 1, min_periods=time_window).mean()
RS = abs(up_chg_avg / down_chg_avg)
RSI = 100 - 100 / (1 + RS)
return RSI.iloc[-1]
df["RSI"] = ""
idx_lst = get_idx_slices(df, time_window)
for idx in tqdm(idx_lst):
df_slice = df.iloc[idx]
df_slice_diff = df_slice["price"].diff()
RSI = get_RSI(df_slice_diff, time_window)
last_idx = idx[len(idx) - 1]
df.loc[last_idx, "RSI"] = RSI
Upvotes: 2