Reputation: 31
I am currently reading parquet files from a delta lake with pl.scan_delta(...)
and want to filter the LazyFrame on lower case values of a string column.
The code below will load all in memory and kill my kernel.
lazy_df = (
pl.scan_delta(...)
.filter(pl.col("partition_col") == "A")
.filter(pl.col("parname").str.to_lowercase().is_in(['a', 'b', 'c']))
.select(*columns)
)
lazy_df.collect()
But without the str
expression, everything will fit into memory. It seems like it needs to read the whole column into memory to be able to run lowercase on it? Is that true?
lazy_df = (
pl.scan_delta(...)
.filter(pl.col("partition_col") == "A")
.filter(pl.col("parname").is_in(['A', 'a', 'b', 'C']))
.select(*columns)
)
lazy_df.collect()
When running print(lazy_df.explain())
on the frames I get the following:
str.to_lowercase()
expressionFILTER [(col("parname").str.lowercase().is_in([Series])) & ([(col("partition_col")) == (Utf8(A))])] FROM
PYTHON SCAN
PROJECT */16 COLUMNS
PYTHON SCAN
PROJECT */16 COLUMNS
SELECTION: ((pa.compute.field('parname')).isin(["A","a","b","C"]) & (pa.compute.field('partition_col') == 'A'))
Also, how can I filter this frame on lower case values of a column in an memory efficient way?
Upvotes: 1
Views: 855
Reputation: 18691
I want to address a potential misconception. You say
But without the str expression, everything will fit into memory. It seems like it needs to read the whole column into memory to be able to run lowercase on it? Is that true?
When you collect, it is putting everything in memory. The issue isn't the size of the column by itself, it's the overhead of the operation. Polars and arrow memory is really optimized for numbers, it doesn't do string manipulation as well.
Main point
In addition to the inefficiency of string methods, you're also hamstrung by the fact that polars doesn't read delta natively, it does it through the deltalake library. Through that library what you get is a scan_pyarrow_dataset
. You can see this either by looking at the source code or noting the "PYTHON SCAN" from the explain. Unfortunately, the streaming engine doesn't work with pyarrow datasets. What you could do is use pyarrow directly with a scanner and batches to ease the memory requirements.
import deltalake
import pyarrow.compute as pc
import pyarrow.dataset as ds
import polars as pl
### This is basically a copy-paste from scan_delta source
dl_ds = (
deltalake.DeltaTable(
## Parameters for you to set or remove
table_path,
version=version,
storage_options=storage_options,
**delta_table_options,
).to_pyarrow_dataset(**pyarrow_options))
## This is the scanner/filter generator
batch_generator = dl_ds.scanner(filter=(
(pc.ascii_lower(ds.field('parname')).isin(['a','b','c'])) &
(ds.field('partition_col')=='A')),
columns=columns,
batch_size=500 # increase batch_size for speed and reduce it to use less memory
).to_batches()
lazy_df = pl.LazyFrame()
while True:
try:
lazy_df =pl.concat([lazy_df, pl.from_arrow(next(batch_generator)).lazy()])
except StopIteration:
break
lazy_df.collect()
Note that the batch_size=500
is probably way too small. pyarrow's default is 128Ki and it is measuring rows.
I put columns=columns
in the scanner. That's assuming that you just have a list of strings. If your columns
are polars expressions then it won't work. There's a reasonable chance you can replicate your polars expressions with pyarrow ones Here's the doc page for Scanner arguments.
Upvotes: 2