Reputation: 3616
I have a large-ish dataset, and I would like to run several aggregations on it without loading the whole thing into memory. Specifically, I have a list of simple filters of the form pl.col('x1') == 'y1'
, and I'd like get the result of my aggregation under each of these filters separately.
I can get the desired result by running
dataset.filter(
pl.col(x) == y
).groupby(pl.col('a')).agg(
pl.col('b').sum()
).collect(streaming=True)
for each x
and y
separately, but I have somewhere on the order of a hundred different filters, so taking this many passes over the dataset is very time-consuming.
I expected that I should be able to get the desired result by doing something like
dataset.groupby(pl.col('a')).agg(
*[pl.col('b').filter(pl.col(x) == y).sum().alias(f'{x}={y}') for x, y in filters]
).collect(streaming=True)
but while this works for a small subset of the data, as soon as I try to run it on the full dataset (even with a single filter), the process consumes all memory on the machine and dies. I assume this means that polars is not running this query in streaming mode, even though all individual components I'm using should be streaming-compatible.
Is there a way to get my desired results using a single pass in streaming mode? Is this something that polars doesn't yet support?
Upvotes: 0
Views: 993
Reputation: 3616
It seems that the solution here is to use polars.when
instead of polars.col.filter
. My working solution looks like:
dataset.groupby(pl.col('a')).agg(
*[pl.when(pl.col(x) == y)
.then(pl.col('b'))
.otherwise(pl.lit(0))
.sum().alias(f'{x}={y}') for x, y in filters]
).collect(streaming=True)
which computes all my aggregations on a single pass of the dataset, without ever holding more than a fraction of the dataset in memory.
I'm still unclear on whether this difference in streaming support between pl.col.filter
and pl.when
is
col.filter
expression,Upvotes: 0