Reputation: 222521
I have 3 Pandas dataframes
df_a = pd.DataFrame(data={
'id': [1, 5, 3, 2],
'ts': [3, 5, 11, 14],
'other_cols': ['...'] * 4
})
df_b = pd.DataFrame(data={
'id': [2, 1, 3],
'ts': [7, 8, 15],
'other_cols': ['...'] * 3
})
df_c = pd.DataFrame(data={
'id': [154, 237, 726, 814, 528, 237, 248, 514],
'ts': [1, 2, 4, 6, 9, 10, 12, 13],
'other_cols': ['...'] * 8
})
Here is the problem I need to solve.
id
in df_a
find the corresponding id
in df_b
and their timestamps. Lets assume ts_a
and ts_b
.df_c
between min(ts_a, ts_b)
and max(ts_a, ts_b)
and calculate some custom function on these rows. This function can be a pd function (in 95% of the time) but it can be any python function.Here are examples of rows for each ids (id, ts):
[726, 4], [814, 6]
[528, 9], [237, 10], [248, 12], [514, 13]
[248, 12], [514, 13]
The output does not really matter, so anything that can map id
to f(rows for that id)
would do the job.
For example let's assume that I need to apply a simple len
function on results, I will get the following results
id | res |
---|---|
1 | 2 |
2 | 4 |
3 | 2 |
If my function is max(ts) - min(ts)
, the results are:
id | res |
---|---|
1 | 2 = 6 - 4 |
2 | 4 = 13 - 9 |
3 | 1 = 13 - 12 |
Here are the assumptions on dataframes:
ids
in each corresponding tables are uniquets
id
in df_a
which does not exist in df_b
and wise versa (but the percentage of missed ids is less than 1%)My working solutions
Attempt 1
id -> ts
from df_b
. Linear in terms of length of df_bts, other_cols
from df_c
. Linear in terms of df_c as it is already sorted by tsAttempt 2
df = pd.concat([df_a, df_b, df_c]).sort_values(by='ts').reset_index(drop=True)
seen_ids
(id -> index
) where you put ids from table A/B. If you see the id, in this dictionary, then df.iloc[index_1:index_2]
, filter them to only rows in C and apply the functionBoth attempts work correctly and run in loglinear time but for my data it takes ~20-30 mins to run, which is bearable but not ideal. On top of this there is an issue with additional memory requirement to store additional data.
My question to Pandas gurus
Can this be achieved with pure Pandas and be more efficient than my custom implementation?
Upvotes: 17
Views: 1409
Reputation: 50318
This problem can be efficiently solved in two parts.
The first part consists in finding the matching rows in df_a
and df_b
as well as the range of rows of df_c
based on ts
. This can can be done very quickly using a parallel Numba implementation (while consuming only a fraction of the input datasets).
The second part consists in computing the user-defined function based that are possibly Pandas ones. This later operation is inherently slow and memory expensive. Indeed, Pandas functions operate mainly on dataframes/series which are not efficient here. Iterating over Pandas dataframe containing generic pure-Python type is known to be painfully slow. Building many small dataframe is slow (there is a pretty high overhead to even create an empty dataframe) but memory efficient. Creating a big dataframe is significantly faster but this is clearly not memory efficient since it nearly force many rows to be replicated (dozens or even hundreds of time due to the number of items in df_c
to extract per id of df_a
/df_b
. In the end, the fastest Pandas solution will be far slower than the optimal time (by at least one order of magnitude). Also note that parallelism will barely help here because of the GIL preventing multithreaded code to be fast and pickling preventing multiprocessing to be fast. In addition, tools like Numba or Cython cannot help for user-defined generic pure-Python functions. AFAIK, the only way to make this part really fast and memory efficient is simply not to apply generic pandas functions on huge dataframes or generic pure-Python functions.
The first part can be done using a parallel Numba (JIT compiler) code. While Numba do not supports Pandas directly, Pandas uses mainly Numpy internally which is well supported by Numba. The computation can be split in many chunks computed efficiently in parallel. The main idea is to build a fast index of df_b
so to merge df_a
and df_b
in linear time, and use a binary search so to find ranges of matching rows in df_c
. The resulting code is very fast. The thing is the output format is not very efficient for the part 2. Here is the code:
import numba as nb
import numpy as np
import pandas as pd
# Feel free to change the signature based on the actual type of your dataframe. Smaller types take less memory and tends to be faster because of that.
@nb.njit('(int64[::1], int64[::1], int64[::1], int64[::1], int64[::1])', parallel=True)
def find_matching_rows(df_a_id, df_a_ts, df_b_id, df_b_ts, df_c_ts):
# Build an index of `df_b` IDs
b_index = {df_b_id[i]: i for i in range(df_b_id.size)}
# Mark the `df_a` rows found in `df_b` (parallel)
found = np.empty(df_a_id.size, np.bool_)
for a_row in nb.prange(df_a_id.size):
a_id = df_a_id[a_row]
found[a_row] = a_id in b_index
# Count the number of valid rows (parallel)
count = 0
for a_row in nb.prange(df_a_id.size):
count += found[a_row]
# Count the number of valid item per chunk and
# the offsets of the output of each chunk (mainly parallel)
chunk_size = 32768
chunk_count = (found.size + chunk_size - 1) // chunk_size
count_by_chunk = np.empty(chunk_count, np.int32)
for i in nb.prange(chunk_count):
count_by_chunk[i] = np.sum(found[i*chunk_size:(i+1)*chunk_size])
out_offsets = np.zeros(chunk_count + 1, np.int32)
for i in range(chunk_count):
out_offsets[i+1] = out_offsets[i] + count_by_chunk[i]
assert out_offsets[chunk_count] == count
# Main chunk-based computation (parallel)
a_rows = np.empty(count, np.int32) # `df_a` indices
b_rows = np.empty(count, np.int32) # `df_b` indices
c_rows = np.empty((count, 2), np.int32) # Start/end indices
for chunk_id in nb.prange(chunk_count):
a_row_start = chunk_id * chunk_size
a_row_end = min(df_a_id.size, a_row_start + chunk_size)
offset = out_offsets[chunk_id]
for a_row in range(a_row_start, a_row_end):
# Discard ids of `df_a` not in `df_b`
if not found[a_row]:
continue
a_id = df_a_id[a_row]
b_row = b_index[a_id]
ts_a, ts_b = df_a_ts[a_row], df_b_ts[b_row]
ts_min, ts_max = min(ts_a, ts_b), max(ts_a, ts_b)
c_start_row = np.searchsorted(df_c_ts, ts_min, 'left') # Included
c_end_row = np.searchsorted(df_c_ts, ts_max, 'right') # Excluded
# If the is no row found in `df_c`
if c_start_row >= c_end_row:
c_start_row = c_end_row = -1 # Not discarded (may be useful)
# Save results
a_rows[offset] = a_row
b_rows[offset] = b_row
c_rows[offset, 0] = c_start_row
c_rows[offset, 1] = c_end_row
offset += 1
return (a_rows, b_rows, c_rows)
Here is the way to call the function:
a_rows, b_rows, c_rows = find_matching_rows(
df_a['id'].values, df_a['ts'].values,
df_b['id'].values, df_b['ts'].values,
df_c['ts'].values
)
As seen before, generic approaches are inherently inefficient (for both speed and memory-usage). One solution is to tweak your operation to apply them directly in the previous Numba code. This would make the overall implementation both very fast (ie. parallel and JIT-compiled) and memory efficient (ie. computed on the fly -- no need for a huge temporary dataframe). That being said, Numba do not support generic pure-Python object types nor pandas functions so this can require some non-trivial code rework regarding your actual dataframe.
The inefficient alternative is to create a big temporary dataframe from index-based arrays previously created by find_matching_rows
. Here is an example of Numba code to do that:
@nb.njit('(int32[::1], int32[::1], int32[:,::1])')
def build_df_index(a_rows, b_rows, c_rows):
n = a_rows.size
# Count he total number of rows to be computed in df_c
count = 0
for i in range(n):
count += c_rows[i, 1] - c_rows[i, 0]
new_a_rows = np.empty(count, np.int32)
new_b_rows = np.empty(count, np.int32)
new_c_rows = np.empty(count, np.int32)
offset = 0
for i in range(n):
for j in range(c_rows[i, 1] - c_rows[i, 0]):
new_a_rows[offset] = a_rows[i]
new_b_rows[offset] = b_rows[i]
new_c_rows[offset] = c_rows[i,0] + j
offset += 1
return (new_a_rows, new_b_rows, new_c_rows)
The resulting index arrays can be use to create the final dataframe with df_a.iloc[new_a_rows]
, df_b.iloc[new_b_rows]
and df_c.iloc[new_c_rows]
for example. If your actual dataframe contains only uniform types or ones that Numba supports, then you can directly generate this temporary dataframe with Numba (significantly faster than Pandas iloc
, especially if performed in parallel).
Upvotes: 4
Reputation: 3001
I assume that performing look-ups in df_c
is the rate-limiting step, so I re-indexed to enable direct look-ups vs. searching. I'm also assuming there is sufficient RAM: see Scaling to large datasets otherwise.
import pandas as pd
df_a = pd.DataFrame(data={ # <-- original data frames
'id': [1, 5, 3, 2],
'ts': [3, 5, 11, 14],
'other_columns': ['...'] * 4})
df_b = pd.DataFrame(data={
'id': [2, 1, 3],
'ts': [7, 8, 15],
'other_columns': ['...'] * 3})
df_c = pd.DataFrame(data={
'id': [154, 237, 726, 814, 528, 237, 248, 514],
'ts': [1, 2, 4, 6, 9, 10, 12, 13],
'other_columns': ['...'] * 8})
Combine df_a
and df_b
via inner join
# combine df_a and df_b to find end-points
xs = pd.merge(
left = df_a.set_index('id').loc[:, 'ts'],
right = df_b.set_index('id').loc[:, 'ts'],
on = 'id', sort = True, validate = 'one_to_one',)
xs['ts_lower'] = xs.min(axis=1)
xs['ts_upper'] = xs.max(axis=1)
endpoints = xs[['ts_lower', 'ts_upper']]
print(endpoints)
ts_lower ts_upper
id
1 3 8
2 7 14
3 11 15
Modify df_c
such that its ts
column is an Index. Then expand the index to include values from endpoints
-- this lets us index directly into df_c
, vs. searching:
# a) df_c: convert 'ts' to index
df_c = df_c.set_index('ts').sort_index().loc[:, 'id']
# b) df_c: expand the index
idx = (
df_c.index
.append(pd.Index(endpoints['ts_lower'].values))
.append(pd.Index(endpoints['ts_upper'].values))
.drop_duplicates()
.sort_values()
)
df_c = df_c.reindex(idx, method = 'ffill')
Navigate df_c
via direct look-up
endpoints['val_lower'] = df_c[endpoints['ts_lower']].values
endpoints['val_upper'] = df_c[endpoints['ts_upper']].values
print(endpoints)
ts_lower ts_upper val_lower val_upper
id
1 3 8 237 814
2 7 14 814 514
3 11 15 237 514
Upvotes: 1
Reputation: 1158
Here is my latest attempt. I think it is pretty fast but of course the speed depends entirely on the contents of the tables you try it on. Let me know how it works for you.
Synthetic data generation:
import random
import pandas as pd
a_len = int(1e7)
c_len = int(1e8)
df_a = pd.DataFrame(data={
'id': random.sample(population=range(a_len), k=int(a_len * .99)),
'ts': random.choices(population=range(int(a_len * 10)), k=int(a_len * .99)),
'other_cols': ['...'] * int(a_len * .99)
})
df_a.sort_values(by=["ts"], inplace=True)
df_b = pd.DataFrame(data={
'id': random.sample(population=range(a_len), k=int(a_len * .99)),
'ts': random.choices(population=range(int(a_len * 10)), k=int(a_len * .99)),
'other_cols': ['...'] * int(a_len * .99)
})
df_b.sort_values(by=["ts"], inplace=True)
df_c = pd.DataFrame(data={
'id': range(c_len),
'ts': random.choices(population=range(int(a_len * 1e7)), k=c_len),
'other_cols': ['...'] * c_len
})
df_c.sort_values(by=["ts"], inplace=True)
Some stats on an example generation of these tables is:
size_by_id = df_c_labeled.groupby(by=["id"]).size()
size_by_id.max()
>>> 91
size_by_id.median()
>>> 26.0
The algorithm, utilizing pandas.IntervalIndex
:
import functools
import numpy as np
import pandas as pd
def cartesian_product(*arrays):
"""https://stackoverflow.com/a/11146645/7059681"""
la = len(arrays)
dtype = np.result_type(*arrays)
arr = np.empty([len(a) for a in arrays] + [la], dtype=dtype)
for i, a in enumerate(np.ix_(*arrays)):
arr[...,i] = a
return arr.reshape(-1, la).T
# inner join on id
df_ts = pd.merge(
left=df_a[["id", "ts"]],
right=df_b[["id", "ts"]],
how="inner",
on="id",
suffixes=["_a", "_b"]
)
# a = min ts, b = max ts
df_ts["ts_a"], df_ts["ts_b"] = (
df_ts[["ts_a", "ts_b"]].min(axis=1),
df_ts[["ts_a", "ts_b"]].max(axis=1),
)
a_min = df_ts["ts_a"].min()
b_max = df_ts["ts_b"].max()
interval_index = pd.IntervalIndex.from_arrays(
left=df_ts["ts_a"],
right=df_ts["ts_b"],
closed="both",
)
# rename to avoid collisions
df_c.rename(columns={"id": "id_c", "ts": "ts_c"}, inplace=True)
ts_c = df_c["ts_c"].to_numpy()
df_c_idxs_list, df_ts_idxs_list = [], []
# the first item in ts_c that is at least equal to a_min
c_lo = 0
while ts_c[c_lo] < a_min:
c_lo += 1
c_idx = c_lo
c_hi = len(ts_c)
while c_lo < c_hi and ts_c[c_lo] <= b_max:
# the index of the next greatest ts in ts_c
# depending on how often you many duplicate values you have in ts_c,
# it may be faster to binary search instead of incrementing one by one
# c_idx = bisect.bisect_right(a=ts_c, x=ts_c[c_lo], lo=c_idx, hi=c_hi)
while c_idx < c_hi and ts_c[c_idx] == ts_c[c_lo]:
c_idx += 1
# the indicies of the intervals containing ts_c[c_lo]
unique_ts_idxs = np.where(interval_index.contains(ts_c[c_lo]))[0]
# all the indicies equal to ts_c[c_lo]
unique_c_idxs = df_c.iloc[c_lo: c_idx].index
# all the pairs of these indicies
c_idxs, ts_idxs = cartesian_product(unique_c_idxs, unique_ts_idxs)
df_c_idxs_list.append(c_idxs)
df_ts_idxs_list.append(ts_idxs)
c_lo = c_idx
df_c_idxs = np.concatenate(df_c_idxs_list)
df_ts_idxs = np.concatenate(df_ts_idxs_list)
df_c_labeled = pd.concat(
[
df_ts.loc[df_ts_idxs, :].reset_index(drop=True),
df_c.loc[df_c_idxs, :].reset_index(drop=True)
],
axis=1
)
print(df_c_labeled)
id ts_a ts_b id_c ts_c other_cols
0 1 3 8 726 4 ...
1 1 3 8 814 6 ...
2 2 7 14 528 9 ...
3 2 7 14 237 10 ...
4 3 11 15 248 12 ...
5 2 7 14 248 12 ...
6 3 11 15 514 13 ...
7 2 7 14 514 13 ...
Now we can just do some groupby
stuff:
id_groupby = df_c_labeled.groupby(by="id")
id_groupby["ts_c"].size()
id
1 2
2 4
3 2
Name: ts_c, dtype: int64
id_groupby["ts_c"].max() - id_groupby["ts_c"].min()
id
1 2
2 4
3 1
Name: ts_c, dtype: int64
Upvotes: 6
Reputation: 16561
To add one possible optimisation to the existing answers: if there are duplicates in (min, max)
combinations, then you could perform the lookup/calculation in df_c
only for the unique (min, max)
values (or alternatively implement caching).
This could be a substantial reduction in computation if the timestamps are at a fairly low resolution (e.g. days), but probably of not much use if timestamps are at high resolution (e.g. picoseconds). Of course, if you want fast approximate answers, you could always round the timestamps to a tolerable margin of error.
In practice, this would look as follows :
from pandas import DataFrame, merge
df_a = DataFrame(
data={"id": [1, 5, 3, 2], "ts": [3, 5, 11, 14], "other_cols": ["..."] * 4}
)
df_b = DataFrame(data={"id": [2, 1, 3], "ts": [7, 8, 15], "other_cols": ["..."] * 3})
df_c = DataFrame(
data={
"id": [154, 237, 726, 814, 528, 237, 248, 514],
"ts": [1, 2, 4, 6, 9, 10, 12, 13],
"other_cols": ["..."] * 8,
}
)
# indexing and min/max are adapted the answers by @srinath, @ringo and @BeRT2me
df_a = df_a.set_index("id")["ts"] # keep only info of interest
df_b = df_b.set_index("id")["ts"] # keep only info of interest
df = merge(df_a, df_b, how="inner", left_index=True, right_index=True)
df["min"] = df[["ts_x", "ts_y"]].min(axis=1)
df["max"] = df[["ts_x", "ts_y"]].max(axis=1)
df = df[["min", "max"]]
# find unique min-max combinations (drop index to avoid confusion)
unique = df.drop_duplicates().reset_index(drop=True)
# proceed to actual calculations (below is just an example)
# make sure df_c is indexed by ts so we can lookup
df_c = df_c.set_index("ts").sort_index()
# if computation is costly this can be done in parallel, but
# AFAIK this would require using another library, e.g. dask
for tmin, tmax in unique.values:
sub = df_c.loc[tmin:tmax]
print(tmin, tmax, len(sub))
# 3 8 2
# 11 15 2
# 7 14 4
Upvotes: 1
Reputation: 13242
# Set some indices, note how df_c is different.
df_a = df_a.set_index('id')
df_b = df_b.set_index('id')
# Looks like maybe your `ts` is already sorted? If so, `sort_index()` isn't necessary~
df_c = df_c.set_index('ts').sort_index()
# concat them together, then get the min and max from each ts.
df = pd.concat([df_a, df_b])
# Producing the min/max this way should be fast.
# sort=False is optional for performance and means your output will be jumbled like shown below~
df = df.groupby(level=-1, sort=False)['ts'].agg(['min', 'max'])
# Making this work with `raw=True` should improve performance.
# Being able to use `loc` should help.
out = df.apply(lambda x: df_c.loc[x[0]:x[1], 'id'].to_dict(), axis=1, raw=True)
print(out)
Output:
id
1 {4: 726, 6: 814}
5 {}
3 {12: 248, 13: 514}
2 {9: 528, 10: 237, 12: 248, 13: 514}
dtype: object
I don't have a ton of faith in this method, but I'd love to know how it turns out~
After setting and sorting (where necessary) indices, the one-liner would be:
# Only concating `ts` will be faster, no need to drag everything along.
out = (pd.concat([df_a[['ts']], df_b[['ts']]])
.groupby(level=-1, sort=False)['ts']
.agg(['min', 'max'])
.apply(lambda x: df_c.loc[x[0]:x[1], 'id'].to_dict(), axis=1, raw=True)
# See this alternative if only ts are needed:
#.apply(lambda x: set(df_c.loc[x[0]:x[1], 'id'].index), axis=1, raw=True)
)
Upvotes: 3
Reputation: 2998
I agree with @QuangHong. it may not be efficient for taking up this large data.
However, i tried your sample input using pandas
Merge df_a
and df_b
based on id
column. did inner
join as we need the rows which are there on both
df_merge_a_b = df_a.merge(df_b, on=['id'], how='inner')
Find the minimum and maximum of the corresponding rows
df_merge_a_b["min_ab"] = df_merge_a_b[["ts_x", "ts_y"]].min(axis=1)
df_merge_a_b["max_ab"] = df_merge_a_b[["ts_x", "ts_y"]].max(axis=1)
With the min and max in place, for each row in the dataframe, find the ids which are between min and max
def get_matching_rows(row):
min_ab = row["min_ab"]
max_ab = row["max_ab"]
result = df_c[df_c["ts"].between(min_ab, max_ab)]
print(result)
## apply custom function on result and return
df_merge_a_b.apply(lambda x: get_matching_rows(x), axis=1)
Sample output
id ts other_cols
2 726 4 ...
3 814 6 ...
id ts other_cols
6 248 12 ...
7 514 13 ...
id ts other_cols
4 528 9 ...
5 237 10 ...
6 248 12 ...
7 514 13 ...
apply the custom function and concat all the output together.
May not be super efficient.. but wanted to try the sample in pandas.
Upvotes: 3