TomNorway
TomNorway

Reputation: 3162

Replacing a pivot with a lazy group_by operation

I'm pivoting a rather large dataframe of shape (10_000_000, 678) into one of approx. shape (770_000, 8_789) to create a dataset for an ML algorithm. It's a relatively slow operation taking about half an hour on a high-ram cluster I am using, and I'm wondering if there is a way to speed it up. Here is a minimum example, with a larger one below:

import polars as pl
import numpy as np

data = {
    "ID": [1,1,1,2,2,2,3,3,3],
    "rank": [1,2,3,1,2,3,1,2,3], # rank is always repeating 1-3 (or 0-12 in large example)
    "A": np.random.random((9)),
    "B": np.random.random((9)),
}

df = pl.DataFrame(data)
df_pivot = df.pivot(on="rank", index="ID")
# df_pivot
shape: (3, 7)
┌─────┬──────────┬──────────┬──────────┬──────────┬──────────┬──────────┐
│ ID  ┆ A_1      ┆ A_2      ┆ A_3      ┆ B_1      ┆ B_2      ┆ B_3      │
│ --- ┆ ---      ┆ ---      ┆ ---      ┆ ---      ┆ ---      ┆ ---      │
│ i64 ┆ f64      ┆ f64      ┆ f64      ┆ f64      ┆ f64      ┆ f64      │
╞═════╪══════════╪══════════╪══════════╪══════════╪══════════╪══════════╡
│ 1   ┆ 0.999998 ┆ 0.778061 ┆ 0.348383 ┆ 0.299838 ┆ 0.779631 ┆ 0.577527 │
│ 2   ┆ 0.805992 ┆ 0.222069 ┆ 0.319605 ┆ 0.155366 ┆ 0.111521 ┆ 0.046851 │
│ 3   ┆ 0.13981  ┆ 0.192265 ┆ 0.525983 ┆ 0.138687 ┆ 0.853611 ┆ 0.618823 │
└─────┴──────────┴──────────┴──────────┴──────────┴──────────┴──────────┘

The polars pivot code states that in a comment:

Polars lazy does not implement a pivot because it is impossible to know the schema without materializing the whole dataset. This makes a pivot quite a terrible operation for performant workflows. An optimization can never be pushed down passed a pivot.

And in the groupby.pivot code:

Polars'/arrow memory is not ideal for transposing operations like pivots. If you have a relatively large table, consider using a groupby over a pivot.

Some questions:

# Much larger example, but with 10_000 rows instead of 10_000_000
# 10_000 runs in 3 seconds, 100_000 runs in 40 seconds (M1 macbook)

from string import ascii_lowercase
import polars as pl
import numpy as np

ranks = np.arange(13)
N_ROWS = 10_000 # this could be ~10_000_000

df = (pl.DataFrame({"ID": np.arange(N_ROWS)})).join(
    pl.DataFrame({"rank": ranks}), how="cross"
)

# create 26**2 dummy column names
column_names = []
for letter1 in ascii_lowercase:
    for letter2 in ascii_lowercase:
        column_names.append(letter1 + letter2)

# stack frames to create: ID, ranks, aa, ab, ..., zz
df = df.hstack(
    pl.DataFrame({letter: np.random.random(len(df)) for letter in column_names})
)

df_pivot = df.pivot(on="rank", index="ID")

Upvotes: 4

Views: 2923

Answers (1)

user18559875
user18559875

Reputation:

How about a non-lazy solution that brings your wall-clock on the much larger example with N_ROWS = 1_000_000 from over 7 minutes to around ... 10 seconds?

The Algorithm

I actually do know the schema ahead-of-schedule, since in my situation rank is a known series ([1, 2, 3] in the example). If implemented, would a lazy pivot where one can supply the schema be more performant than the eager one?

We're going to take advantage of the structure of the data. We'll re-sort the data strategically, and use slice on each series. (Slices are nearly free.)

I've also added an ID column with dtype Int64 so that we can use equals to compare the results of this algorithm to the output of the pivot code from the example.

Note that the algorithm is not in Lazy mode.

ser_slices = [
    s.slice(rank * N_ROWS, N_ROWS).alias(s.name + "_" + str(rank))
    for s in df.sort("rank", "ID")[:, 2:]
    for rank in range(0, 13)
]

result = (
    pl.DataFrame(ser_slices)
    .with_row_index('ID')
    .with_columns(
        pl.col('ID').cast(pl.Int64)
    )
)

Performance Comparison

Let's compare the performance and output of the algorithm above with the pivot code in your example.

We'll use your larger example with N_ROWS = 1_000_000.

The Algorithm Above (Slices in Eager Mode)

If you watch the performance of your CPU on this algorithm (e.g., in top on Linux), you'll notice that the algorithm runs heavily in parallel.

import time
start = time.perf_counter()
ser_slices = [
    s.slice(rank * N_ROWS, N_ROWS).alias(s.name + "_" + str(rank))
    for s in df.sort("rank", "ID")[:, 2:]
    for rank in range(0, 13)
]

result = (
    pl.DataFrame(ser_slices)
    .with_row_index('ID')
    .with_columns(
        pl.col('ID').cast(pl.Int64)
    )
)
result
print(time.perf_counter() - start)
shape: (1000000, 8789)
┌────────┬──────────┬──────────┬──────────┬─────┬──────────┬──────────┬──────────┬──────────┐
│ ID     ┆ aa_0     ┆ aa_1     ┆ aa_2     ┆ ... ┆ zz_9     ┆ zz_10    ┆ zz_11    ┆ zz_12    │
│ ---    ┆ ---      ┆ ---      ┆ ---      ┆     ┆ ---      ┆ ---      ┆ ---      ┆ ---      │
│ i64    ┆ f64      ┆ f64      ┆ f64      ┆     ┆ f64      ┆ f64      ┆ f64      ┆ f64      │
╞════════╪══════════╪══════════╪══════════╪═════╪══════════╪══════════╪══════════╪══════════╡
│ 0      ┆ 0.702774 ┆ 0.250239 ┆ 0.023121 ┆ ... ┆ 0.348179 ┆ 0.530304 ┆ 0.380147 ┆ 0.194915 │
│ 1      ┆ 0.184479 ┆ 0.562245 ┆ 0.038145 ┆ ... ┆ 0.575752 ┆ 0.254793 ┆ 0.126996 ┆ 0.557823 │
│ 2      ┆ 0.432553 ┆ 0.111145 ┆ 0.937674 ┆ ... ┆ 0.493157 ┆ 0.843966 ┆ 0.6257   ┆ 0.044151 │
│ 3      ┆ 0.607535 ┆ 0.389257 ┆ 0.864887 ┆ ... ┆ 0.765563 ┆ 0.312805 ┆ 0.085054 ┆ 0.4972   │
│ ...    ┆ ...      ┆ ...      ┆ ...      ┆ ... ┆ ...      ┆ ...      ┆ ...      ┆ ...      │
│ 999996 ┆ 0.101384 ┆ 0.918382 ┆ 0.024    ┆ ... ┆ 0.643435 ┆ 0.905557 ┆ 0.8266   ┆ 0.460866 │
│ 999997 ┆ 0.164607 ┆ 0.766515 ┆ 0.565382 ┆ ... ┆ 0.493534 ┆ 0.595359 ┆ 0.601306 ┆ 0.637546 │
│ 999998 ┆ 0.213503 ┆ 0.874676 ┆ 0.165461 ┆ ... ┆ 0.676855 ┆ 0.730082 ┆ 0.9647   ┆ 0.710811 │
│ 999999 ┆ 0.246028 ┆ 0.963617 ┆ 0.065186 ┆ ... ┆ 0.1091   ┆ 0.913634 ┆ 0.425842 ┆ 0.715304 │
└────────┴──────────┴──────────┴──────────┴─────┴──────────┴──────────┴──────────┴──────────┘
>>> print(time.perf_counter() - start)
10.33561857099994

Roughly 10 seconds. Not bad.

Pivot (from the example code)

If you watch your CPU monitor, you'll notice that the pivot code is largely single-threaded.

import time
start = time.perf_counter()
df_pivot = df.pivot(on="rank", index="ID")
df_pivot
print(time.perf_counter() - start)
shape: (1000000, 8789)
┌────────┬──────────┬──────────┬──────────┬─────┬──────────┬──────────┬──────────┬──────────┐
│ ID     ┆ aa_0     ┆ aa_1     ┆ aa_2     ┆ ... ┆ zz_9     ┆ zz_10    ┆ zz_11    ┆ zz_12    │
│ ---    ┆ ---      ┆ ---      ┆ ---      ┆     ┆ ---      ┆ ---      ┆ ---      ┆ ---      │
│ i64    ┆ f64      ┆ f64      ┆ f64      ┆     ┆ f64      ┆ f64      ┆ f64      ┆ f64      │
╞════════╪══════════╪══════════╪══════════╪═════╪══════════╪══════════╪══════════╪══════════╡
│ 0      ┆ 0.702774 ┆ 0.250239 ┆ 0.023121 ┆ ... ┆ 0.348179 ┆ 0.530304 ┆ 0.380147 ┆ 0.194915 │
│ 1      ┆ 0.184479 ┆ 0.562245 ┆ 0.038145 ┆ ... ┆ 0.575752 ┆ 0.254793 ┆ 0.126996 ┆ 0.557823 │
│ 2      ┆ 0.432553 ┆ 0.111145 ┆ 0.937674 ┆ ... ┆ 0.493157 ┆ 0.843966 ┆ 0.6257   ┆ 0.044151 │
│ 3      ┆ 0.607535 ┆ 0.389257 ┆ 0.864887 ┆ ... ┆ 0.765563 ┆ 0.312805 ┆ 0.085054 ┆ 0.4972   │
│ ...    ┆ ...      ┆ ...      ┆ ...      ┆ ... ┆ ...      ┆ ...      ┆ ...      ┆ ...      │
│ 999996 ┆ 0.101384 ┆ 0.918382 ┆ 0.024    ┆ ... ┆ 0.643435 ┆ 0.905557 ┆ 0.8266   ┆ 0.460866 │
│ 999997 ┆ 0.164607 ┆ 0.766515 ┆ 0.565382 ┆ ... ┆ 0.493534 ┆ 0.595359 ┆ 0.601306 ┆ 0.637546 │
│ 999998 ┆ 0.213503 ┆ 0.874676 ┆ 0.165461 ┆ ... ┆ 0.676855 ┆ 0.730082 ┆ 0.9647   ┆ 0.710811 │
│ 999999 ┆ 0.246028 ┆ 0.963617 ┆ 0.065186 ┆ ... ┆ 0.1091   ┆ 0.913634 ┆ 0.425842 ┆ 0.715304 │
└────────┴──────────┴──────────┴──────────┴─────┴──────────┴──────────┴──────────┴──────────┘
>>> print(time.perf_counter() - start)
442.1277434679996

Over 7 minutes. (Given that, I didn't bother to time the two with N_ROWS = 10_000_000)

Comparison of the output

Do they produce the same result?

>>> result.equals(df_pivot)
True

Upvotes: 1

Related Questions