Jossy
Jossy

Reputation: 1021

Speed up groupby rolling apply utilising multiple columns

I'm trying to create a Brier Score for a grouped rolling window. As the function that calculates the Brier Score utilises multiple columns in the grouped rolling window I've had to use the answer here as the basis for a rather hacky solution:

import pandas as pd
import numpy as np
from pandas._libs.tslibs.timestamps import Timestamp
import random

ROWS = 20

# create dataframe

def create_random_dates(start: Timestamp, end: Timestamp, n: int): 
    divide_by = 24*60*60*10**9
    start_u = start.value // divide_by
    end_u = end.value // divide_by
    return pd.to_datetime([random.randint(start_u, end_u) for p in range(n)], unit="D") 

random.seed(1)
start = pd.to_datetime('2015-01-01')
end = pd.to_datetime('2018-01-01')
random_dates = create_random_dates(start, end, ROWS)
df = pd.DataFrame(
    {
        "id_": list(range(ROWS)),
        "date": random_dates,
        "group": [random.randint(1, 2) for p in range(ROWS)],
        "y_true": [random.randint(0, 1) for p in range(ROWS)],
        "y_prob": [random.random() for p in range(ROWS)],
    }
)
df.sort_values(["group", "date"], inplace=True)
df.reset_index(drop=True, inplace=True)
df.reset_index(inplace=True)

# calculate brier score

def calc_brier(series: pd.Series, df: pd.DataFrame) -> float:
    df_group = df.loc[series.values]
    return np.average((df_group["y_true"].values - df_group["y_prob"].values) ** 2)

df_date_idx = df.set_index("date")
df_date_idx.drop(["id_", "y_true", "y_prob"], axis=1, inplace=True)
brier: pd.DataFrame = (
    df_date_idx
    .groupby("group", as_index=False)
    .rolling("1000d", min_periods=3, closed="left")
    .apply(calc_brier, args=(df, ))
)
df.drop("index", axis=1, inplace=True)
df["brier"] = brier["index"].values
df

This works fine with a small number of rows but takes an age once I start scaling ROWS. In my actual use case the dataframe is 1m+ rows and I've given up after a few minutes.

Would anyone have a faster solution?

Upvotes: 0

Views: 147

Answers (1)

padu
padu

Reputation: 899

You can easily achieve fast execution with parallel-pandas. In your example, I increased the number of groups from 2 to 100. Initialize parallel-pandas and use p_apply the parallel analog of the apply method

import time

from pandas._libs.tslibs.timestamps import Timestamp
import random
import pandas as pd
import numpy as np
from parallel_pandas import ParallelPandas

ROWS = 10_000


# create dataframe

def create_random_dates(start: Timestamp, end: Timestamp, n: int):
    divide_by = 24 * 60 * 60 * 10 ** 9
    start_u = start.value // divide_by
    end_u = end.value // divide_by
    return pd.to_datetime([random.randint(start_u, end_u) for p in range(n)], unit="D")


def calc_brier(series: pd.Series, df: pd.DataFrame) -> float:
    df_group = df.loc[series.values]
    return np.average((df_group["y_true"].values - df_group["y_prob"].values) ** 2)


if __name__ == '__main__':
    ParallelPandas.initialize(n_cpu=16, disable_pr_bar=0, split_factor=1)
    random.seed(1)
    start = pd.to_datetime('2015-01-01')
    end = pd.to_datetime('2018-01-01')
    random_dates = create_random_dates(start, end, ROWS)
    df = pd.DataFrame(
        {
            "id_": list(range(ROWS)),
            "date": random_dates,
            "group": [random.randint(1, 100) for p in range(ROWS)],
            "y_true": [random.randint(0, 1) for p in range(ROWS)],
            "y_prob": [random.random() for p in range(ROWS)],
        }
    )
    df.sort_values(["group", "date"], inplace=True)
    df.reset_index(drop=True, inplace=True)
    df.reset_index(inplace=True)

    # calculate brier score



    df_date_idx = df.set_index("date")
    df_date_idx.drop(["id_", "y_true", "y_prob"], axis=1, inplace=True)
    start = time.monotonic()
    brier: pd.DataFrame = (
        df_date_idx
        .groupby("group")
        .rolling("1000d", min_periods=3, closed="left")
        .p_apply(calc_brier, args=(df, ))
    )
    print(f'parallel time took: {time.monotonic() -start:.1f}')
    df.drop("index", axis=1, inplace=True)
    df["brier"] = brier["index"].values

Output: parallel time took: 0.8 s.

For 10,000 lines it took less than a second versus 7 seconds using the non-parallel apply method on my PC.

Upvotes: 1

Related Questions