Giampaolo Levorato
Giampaolo Levorato

Reputation: 1622

How to calculate the Relative Strength Index (RSI) through record iterations in pandas dataframe

I have created a pandas dataframe as follows:

import pandas as pd
import numpy as np
    
ds = { 'trend' : [1,1,1,1,2,2,3,3,3,3,3,3,4,4,4,4,4], 'price' : [23,43,56,21,43,55,54,32,9,12,11,12,23,3,2,1,1]}

df = pd.DataFrame(data=ds)

The dataframe looks as follows:

display(df)

   trend    price
0   1        23
1   1        43
2   1        56
3   1        21
4   2        43
5   2        55
6   3        54
7   3        32 
8   3         9
9   3        12
10  3        11
11  3        12
12  4        23
13  4         3
14  4         2
15  4         1
16  4         1

I have saved the dataframe to a .csv file called df.csv:

df.to_csv("df.csv", index = False)

I have then created a function that calculates the Relative Strength Index (RSI - see: https://www.investopedia.com/terms/r/rsi.asp):

def get_RSI(df, column, time_window):
    """Return the RSI indicator for the specified time window."""
    diff = df[column].diff(1)

    # This preservers dimensions off diff values.
    up_chg = 0 * diff
    down_chg = 0 * diff

    # Up change is equal to the positive difference, otherwise equal to zero.
    up_chg[diff > 0] = diff[diff > 0]

    # Down change is equal to negative deifference, otherwise equal to zero.
    down_chg[diff < 0] = diff[diff < 0]

    # We set com = time_window-1 so we get decay alpha=1/time_window.
    up_chg_avg = up_chg.ewm(com=time_window - 1,
                            min_periods=time_window).mean()
    down_chg_avg = down_chg.ewm(com=time_window - 1,
                                min_periods=time_window).mean()

    RS = abs(up_chg_avg / down_chg_avg)
    df['RSI'] = 100 - 100 / (1 + RS)
    df = df[['RSI']]
    return df

I need to create a new field called RSI which:

  1. iterates through each and every record of the dataframe
  2. calculates the RSI by considering the price observed at each iteration and the last prices (RSI length is 3 in this example) observed in the previous trends.

For example:

I have then written this code:

rsi = []

for i in range(len(df)):

    ds = pd.read_csv("df.csv", nrows=i+1)
    print(ds.info())
    d = ds.groupby(['trend'], as_index=False).agg(
                                                    {'price':'last'})

    get_RSI(d,'price',3)
    rsi.append(d['RSI'].iloc[-1])

df['RSI'] = rsi

The dataset looks correct:

display(df)

  trend price   RSI
0   1   23     NaN
1   1   43     NaN
2   1   56     NaN
3   1   21     NaN
4   2   43     NaN
5   2   55     NaN
6   3   54     NaN
7   3   32     NaN
8   3   9      NaN
9   3   12     NaN
10  3   11     NaN
11  3   12     NaN
12  4   23     47.667343
13  4   3      28.631579
14  4   2      28.099174
15  4   1      27.586207
16  4   1      27.586207

Th problem is that I need to process about 4 million records and it would take approximately 60 hours.

Does anyone know how to get the same results in a quick, efficient way, please?

Upvotes: 1

Views: 348

Answers (2)

inquirer
inquirer

Reputation: 4823

Instead of reading the file on each line in a loop, it is better to do this:

ds = df.loc[:i]

Update 27.07.2024 Partially used what dydev did with range_group. Removing the call on each line. Using vector operations. As a result, 4 million lines are counted in two minutes. This will work if the trends are ordered. 1, 2, 3, etc., but not 1, 2, 3, 1 for example. Moved the previous version down.

Dataframe generation

n = 15
trend = np.random.randint(low=1, high=7, size=(n))
price = np.random.uniform(low=1, high=100, size=(n))
df = pd.DataFrame({'trend': trend, 'price': price})
df = df.sort_values(by=['trend']).reset_index(drop=True)

The calculation code itself:

time_window = 3
trends = df["trend"].unique()

arr = df['price'].values

range_group = np.stack(
    [df[df["trend"] == trend].index.values.take([0, -1]) for trend in trends]
)
price =  np.full((len(df), trends.size), np.nan)
prev = arr[range_group[:time_window, 1]]

for i in range(time_window, len(trends)):
    stop = range_group[i, 1] + 1
    price[range_group[i, 0]:stop, -1] = arr[range_group[i, 0]:stop]
    price[range_group[i, 0]:stop, -(prev.size+1):-1] = prev
    prev = price[range_group[i, 1], -(prev.size+1):]


price = price[range_group[time_window, 0]:]
diff = np.diff(price, axis=1)

up_chg = 0 * diff
down_chg = 0 * diff
up_chg[diff > 0] = diff[diff > 0]
down_chg[diff < 0] = diff[diff < 0]

up_chg_avg = pd.DataFrame(up_chg).T.ewm(
    com=time_window - 1, min_periods=time_window).mean().iloc[-1]
down_chg_avg = pd.DataFrame(down_chg).T.ewm(
    com=time_window - 1, min_periods=time_window).mean().iloc[-1]


RS = np.abs(up_chg_avg / down_chg_avg)
df.loc[range_group[time_window, 0]:, 'newRSI'] = (100 - 100 / (1 + RS)).values

Previous version. The ind column was created to receive indexes in map. An empty array arr is generated, which is filled on each line in func_numpy. The rest is your operations, which are converted to vector ones, which are excluded from being processed row by row and executed in one go.

I also checked that the calculated RSI values ​​coincide with your algorithm.

df['ind'] = df.index
size_trend = df['trend'].unique().size
arr =  np.full((len(df), size_trend), np.nan)

def func_numpy(x):
    row_last = df.loc[:x].groupby('trend')['price'].last().values
    arr[x, -row_last.size:] = row_last

df['ind'].map(func_numpy)

diff = np.diff(arr, axis=1)

up_chg = 0 * diff
down_chg = 0 * diff
up_chg[diff > 0] = diff[diff > 0]
down_chg[diff < 0] = diff[diff < 0]

up_chg_avg = pd.DataFrame(up_chg).T.ewm(
    com=time_window - 1, min_periods=time_window).mean().iloc[-1].values
down_chg_avg = pd.DataFrame(down_chg).T.ewm(
    com=time_window - 1, min_periods=time_window).mean().iloc[-1].values

RS = np.abs(up_chg_avg / down_chg_avg)
df['newRSI'] = 100 - 100 / (1 + RS)

Upvotes: 2

dydev
dydev

Reputation: 481

It wasn't completely clear in the question what the analysis was trying to achieve - there is a high likelihood that there's a more efficient method if this was understood.

Using the below approach I managed to get the time down from 5 hours to 35 minutes on the simulated dataset, whilst still using the same methodology.

I simulated your 4m dataset as follows:-

import random
from tqdm import tqdm

random.seed(1337)
trends = [x for x in range(1000)]

ds = {
    # sorted as I assume the 'trend' is the time indicator.
    "trend": sorted(
        [trends[int((random.random() * 1000))] for _ in tqdm(range(int(4e6)))]
    ),
    "price": [random.random() * 100 for _ in tqdm(range(int(4e6)))],
}

df = pd.DataFrame(data=ds)
df.to_csv(
    "fake_data.csv", index=True
)  # reccommend keeping index to keep track of the time series

Before iterating over the df, I created a list of cartesian products (list of lists), where each cartesian product is the slice of the original df for which you are calculating the RSI. We can then iterate over this list to retrieve each required df slice.

import pandas as pd
from copy import deepcopy
from tqdm import tqdm

df = pd.read_csv("./fake_data.csv")
time_window = 3


def get_trend_ranges(df: pd.DataFrame, trends: list) -> dict[int, tuple]:
    """
    trends - list of unique trend values
    returns - start and end index of each range (assume df sorted by range)
    """
    trends_ranges = {}
    for trend in trends:
        df_trend = df[df["trend"] == trend]
        start_idx = df_trend.iloc[0].name
        end_idx = df_trend.iloc[-1].name
        trends_ranges[trend] = (start_idx, end_idx)
    return trends_ranges


def _get_idx_slices(trends: list, trends_ranges: dict, time_window) -> list[list]:
    """
    trends - list of unique trend values
    trends_ranges - {trend: (start_index, end idx)}
    returns [[idx1, idx2, ..,idxt+1], [idx11, idx22, ..., idx2t+1],..] where t is time_window
    """
    if time_window >= len(trends) or time_window <= 0:
        raise Exception("Time Window not within bounds")
    else:
        unique_trend_analyses = len(trends) - time_window
        idx_lst = []
        for i in tqdm(range(unique_trend_analyses)):
            fixed_t = trends[i : i + time_window]
            fixed_idx = [trends_ranges[trend][1] for trend in fixed_t]
            var_t = trends[i + time_window]
            var_t_start_idx, var_t_stop_idx = (
                trends_ranges[var_t][0],
                trends_ranges[var_t][1] + 1,
            )
            for j in range(var_t_start_idx, var_t_stop_idx):
                full_idx = deepcopy(fixed_idx)
                full_idx.append(j)
                idx_lst.append(full_idx)
    return idx_lst


# in order to store ranges of each trend, we assume trend is sorted.
def get_idx_slices(df, time_window):
    trends = df["trend"].unique().tolist()
    trends_ranges = get_trend_ranges(df, trends)
    idx_slices = _get_idx_slices(trends, trends_ranges, time_window)
    return idx_slices

Now we can iterate over the idx list to use the df slices that we require.

def get_RSI(df, time_window):
    """Return the RSI indicator for the specified time window."""

    up_chg = df.apply(lambda x: x if x > 0 else 0)
    down_chg = df.apply(lambda x: x if x < 0 else 0)

    # We set com = time_window-1 so we get decay alpha=1/time_window.
    up_chg_avg = up_chg.ewm(com=time_window - 1, min_periods=time_window).mean()
    down_chg_avg = down_chg.ewm(com=time_window - 1, min_periods=time_window).mean()

    RS = abs(up_chg_avg / down_chg_avg)
    RSI = 100 - 100 / (1 + RS)
    return RSI.iloc[-1]


df["RSI"] = ""

idx_lst = get_idx_slices(df, time_window)

for idx in tqdm(idx_lst):
    df_slice = df.iloc[idx]
    df_slice_diff = df_slice["price"].diff()

    RSI = get_RSI(df_slice_diff, time_window)
    last_idx = idx[len(idx) - 1]

    df.loc[last_idx, "RSI"] = RSI

Upvotes: 2

Related Questions