BoreBoar
BoreBoar

Reputation: 2729

Groupby and cut on a Lazy DataFrame in Polars

import numpy as np
import polars as pl

def cut(_df):
    _c = _df['x'].cut(bins).with_columns([pl.col('x').cast(pl.Int64)])
    final = _df.join(_c, left_on='x', right_on='x')
    return final

groups = ["A"]*500 + ["B"]*500
bins = [0, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000]
x = np.arange(0,1000)
np.random.shuffle(x)
df = pl.DataFrame({"x":x,"group":groups})
with pl.StringCache():
    res = df.groupby("group").apply(cut)

df
Out[4]: 
shape: (1_000, 2)
┌─────┬───────┐
│ x   ┆ group │
│ --- ┆ ---   │
│ i64 ┆ str   │
╞═════╪═══════╡
│ 105 ┆ A     │
│ 166 ┆ A     │
│ 291 ┆ A     │
│ 183 ┆ A     │
│ …   ┆ …     │
│ 949 ┆ B     │
│ 891 ┆ B     │
│ 831 ┆ B     │
│ 535 ┆ B     │
└─────┴───────┘
res
Out[5]: 
shape: (1_000, 4)
┌─────┬───────┬─────────────┬─────────────────┐
│ x   ┆ group ┆ break_point ┆ category        │
│ --- ┆ ---   ┆ ---         ┆ ---             │
│ i64 ┆ str   ┆ f64         ┆ cat             │
╞═════╪═══════╪═════════════╪═════════════════╡
│ 2   ┆ B     ┆ 100.0       ┆ (0.0, 100.0]    │
│ 3   ┆ B     ┆ 100.0       ┆ (0.0, 100.0]    │
│ 4   ┆ B     ┆ 100.0       ┆ (0.0, 100.0]    │
│ 6   ┆ B     ┆ 100.0       ┆ (0.0, 100.0]    │
│ …   ┆ …     ┆ …           ┆ …               │
│ 991 ┆ A     ┆ 1000.0      ┆ (900.0, 1000.0] │
│ 993 ┆ A     ┆ 1000.0      ┆ (900.0, 1000.0] │
│ 996 ┆ A     ┆ 1000.0      ┆ (900.0, 1000.0] │
│ 997 ┆ A     ┆ 1000.0      ┆ (900.0, 1000.0] │
└─────┴───────┴─────────────┴─────────────────┘

Is there a way to do the above with a polars lazy DataFrame without using apply or map? My end goal is to scan a large csv, transform it and sink it using sink_parquet. I get the following error when I use map or apply to cut the lazy dataframe.

PanicException: sink_parquet not yet supported in standard engine. 

Upvotes: 1

Views: 811

Answers (1)

jqurious
jqurious

Reputation: 21154

I got it to work using a streamable udf.

df = pl.from_repr("""
┌─────┬───────┐
│ x   ┆ group │
│ --- ┆ ---   │
│ i64 ┆ str   │
╞═════╪═══════╡
│ 8   ┆ A     │
│ 1   ┆ A     │
│ 7   ┆ A     │
│ 4   ┆ A     │
│ 0   ┆ A     │
│ 2   ┆ B     │
│ 5   ┆ B     │
│ 9   ┆ B     │
│ 6   ┆ B     │
│ 3   ┆ B     │
└─────┴───────┘
""").lazy()

bins = [0, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000]

schema = pl.Struct([
   pl.Field(name="", dtype=pl.Float64),
   pl.Field(name="break_point", dtype=pl.Float64), 
   pl.Field(name="category", dtype=pl.Categorical), 
   pl.Field(name="group", dtype=pl.Utf8)
])

return_dtype = pl.List(schema)

(df.map(
   function = lambda df: 
       df.select(
          pl.col("x").apply(
             lambda col: col.cut(bins, maintain_order=True).with_columns(df.limit(1).select("group")).to_struct(""), 
             return_dtype=return_dtype
          ).over("group")),
   schema = {"x": schema},
   streamable = True)
.unnest("x")
.rename({"": "x"})
.sink_parquet("sink.parquet"))

Upvotes: 2

Related Questions