Isaac
Isaac

Reputation: 3616

How to apply multiple filters in an aggregation in polars while staying in streaming mode

I have a large-ish dataset, and I would like to run several aggregations on it without loading the whole thing into memory. Specifically, I have a list of simple filters of the form pl.col('x1') == 'y1', and I'd like get the result of my aggregation under each of these filters separately.

I can get the desired result by running

dataset.filter(
  pl.col(x) == y
).groupby(pl.col('a')).agg(
  pl.col('b').sum()
).collect(streaming=True)

for each x and y separately, but I have somewhere on the order of a hundred different filters, so taking this many passes over the dataset is very time-consuming.

I expected that I should be able to get the desired result by doing something like

dataset.groupby(pl.col('a')).agg(
  *[pl.col('b').filter(pl.col(x) == y).sum().alias(f'{x}={y}') for x, y in filters]
).collect(streaming=True)

but while this works for a small subset of the data, as soon as I try to run it on the full dataset (even with a single filter), the process consumes all memory on the machine and dies. I assume this means that polars is not running this query in streaming mode, even though all individual components I'm using should be streaming-compatible.

Is there a way to get my desired results using a single pass in streaming mode? Is this something that polars doesn't yet support?

Upvotes: 0

Views: 993

Answers (1)

Isaac
Isaac

Reputation: 3616

It seems that the solution here is to use polars.when instead of polars.col.filter. My working solution looks like:

dataset.groupby(pl.col('a')).agg(
  *[pl.when(pl.col(x) == y)
    .then(pl.col('b'))
    .otherwise(pl.lit(0))
    .sum().alias(f'{x}={y}') for x, y in filters]
).collect(streaming=True)

which computes all my aggregations on a single pass of the dataset, without ever holding more than a fraction of the dataset in memory.

I'm still unclear on whether this difference in streaming support between pl.col.filter and pl.when is

  • me misunderstanding the col.filter expression,
  • a design constraint of the streaming API,
  • a bug, or
  • just a feature that has yet to be implemented.

Upvotes: 0

Related Questions