Reputation: 2043
I have an interesting performance optimization problem, that currently is a bottleneck in our application
Given the DataFrame with a non-unique timestamp index
, id
and weight
column (events) and a Series of timestamps (observations), I have to assign each observation a random event id that happened at a given timestamp considering weights. Timestamps are clamped to the nearest minute and can be treated as number minutes from some starting datetime.
Test data generation:
import pandas as pd
import numpy as np
import random
from datetime import datetime as dt, timedelta as td
# typical date range is one month
start = dt(2020, 2, 1, 0, 0, 0)
end = dt(2020, 3, 1, 0, 0, 0)
# generate one event per minute
index = pd.date_range(start, end, freq='1min')
N = len(index)
events = pd.DataFrame({'id': np.arange(N), 'weight': np.random.random(N)}, index=index)
# generate some random events to simulate index duplicates
random_minutes = pd.to_datetime([start + td(minutes=random.randint(0, N)) for m in range(3*N)])
random_events = pd.DataFrame({'id': np.arange(3*N), 'weight': np.random.random(3*N)}, index=random_minutes)
events = pd.concat([events, random_events])
# observations, usually order or two orders of magnitude more records than events
observations = pd.Series([start + td(minutes=random.randint(0, N)) for m in range(10*N)])
Sample data points
>>> print(events.sort_index().to_string())
id weight
2020-02-09 01:00:00 0 0.384927
2020-02-09 01:00:00 15 0.991314
2020-02-09 01:00:00 17 0.098999
2020-02-09 01:01:00 1 0.813859
2020-02-09 01:01:00 2 0.922601
2020-02-09 01:01:00 1 0.738795
2020-02-09 01:02:00 2 0.898842
2020-02-09 01:02:00 13 0.621904
2020-02-09 01:03:00 12 0.075857
2020-02-09 01:03:00 3 0.135762
2020-02-09 01:03:00 9 0.398885
...
>>> print(observations.sort_values().to_string())
12 2020-02-09 01:00:00
9 2020-02-09 01:00:00
44 2020-02-09 01:00:00
31 2020-02-09 01:01:00
53 2020-02-09 01:02:00
3 2020-02-09 01:02:00
6 2020-02-09 01:03:00
My current, fastest solution is to groupby
events by index, returning for each group function that remember samples. It's hard to properly vectorize it, since a number of records for each group may vary, and I have to return ID based on weight.
%%timeit
from functools import partial
# create a per-minute random function returning id according to weights
randomizers = events.groupby(level=0).apply(
lambda s: partial(
np.random.choice,
s.id.values,
p=s.weight.values/s.weight.sum()
)
)
# for each observation, find random generator and call it
selections = randomizers.loc[observations].apply(lambda f: f())
14.7 s ± 49.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
So my question, is there a nice, faster way of doing what I need to do? Main problems I'm facing:
Any ideas? I'm considering to use numba, but maybe there are some clever solutions?
Upvotes: 4
Views: 683
Reputation: 1117
I can suggest two points on which you can gain performance here.
First, accessing id/weight columns in groupby.apply
creates new series, which is expensive. If you sort the event dataframe by date, then you can extract the required inputs much more efficiently by slicing the original ndarrays.
The other point is about the RNG. The function random.choice
is quite high level and, in addition to the cumulative distribution function it has to recompute from weights each time, it also shows some serious overheads, maybe for thorough input checking, not sure. Anyway, if you decompose this function in small steps (cdf, random number generation, inverse cdf, value mapping), you can keep it simple and precompute more things, saving some time. Both methods lead to the same ouput if the RNG is reset with the same seed (and inputs processed in the same order of course).
With the reference code, I get the same time as you. With these two changes, the processing is about 8 times faster, not bad.
%%timeit -n 1 -r 5
sevents = events.sort_index() # ensure that get_loc below will not return a mask (slow)
seiv = sevents.id.values
sewv = sevents.weight.values
def randomizer(t):
s = sevents.index.get_loc(t[0]) # either a slice (because of sort) or a scalar
v = seiv[s]
if isinstance(s, slice):
w = sewv[s]
cw = w.cumsum() # cumulative weight (i.e. cdf)
cw /= cw[-1]
return lambda: v[np.searchsorted(cw, np.random.rand() + 1e-35)] # inverse cdf
else:
return lambda: v # only one event with this time
# create a per-minute random function returning id according to weights
randomizers = sevents.index.unique().to_frame().apply(randomizer, axis='columns', raw=True)
# for each observation, find random generator and call it
selections = randomizers.loc[observations].apply(lambda f: f())
1.67 s ± 12.4 ms per loop (mean ± std. dev. of 5 runs, 1 loop each)
Upvotes: 1