Reputation: 111
everyone.
I'm struggling with memory issues while sorting a larger than memory LazyFrame with polars, using the skin_csv command.
I'm developing an engine that should be able to merge and sort multiple large datasets, that comes in the form of distinct CSV files.
I'm working with two 6GB files to merge and sort. I started with a pod of 8GB of RAM, but the application crashes even with 32GB.
Note: Those files are not "conventional" csv files, in a sense that they have custom "header" and "trailer" records on its composition. Each row is identified by a "record code" (the first 2 digits). Diferent record types might have diferent number of fields, therefore I cannot simply read the whole file with a plain "scan_csv" + separator char.
Here is an example of one of those files:
00;XPTO;99999991000197;20240905;130444;000001;20240905;130444;000001;20240901;0000
01;99900000001;F;1;0;000000321
01;00000000001;F;2;0;000000123
01;77700000003;F;0;0;000000999
01;22200000004;F;0;0;000000999
01;12300000004;F;0;0;000000999
99;00000000005;
What my code does:
import polars as pl
import os
# Polars tmp dir should be set to /data since /tmp does not have enough space
os.environ["POLARS_TEMP_DIR"] = "/data/testes/merger/tmp/"
pl.Config.set_verbose(True)
valid_record_codes = ["01", "02", "05", "06"]
sort_column_indexes = [1, 0]
ORIGINAL_ROW_COLUMN = "original_row"
ROW_AS_LIST_COLUMN = "row_as_list"
RECORD_CODE_COLUMN = "record_code"
SEPARATOR = ";"
# Read the input files
lf = pl.scan_csv(
"./part_*",
separator=chr(0000),
has_header=False,
new_columns=[ORIGINAL_ROW_COLUMN],
)
# Add a column on the dataframe for the record_code.
# As for now, the record code is always the first field of the dataframe
lf = lf.select(ORIGINAL_ROW_COLUMN).with_columns(
pl.col(ORIGINAL_ROW_COLUMN).str.split(SEPARATOR).alias(ROW_AS_LIST_COLUMN)
)
# Eliminate undesired records from the dataframe
lf = lf.with_columns(
pl.col(ROW_AS_LIST_COLUMN).list.get(0).alias(RECORD_CODE_COLUMN)
)
lf = lf.filter(pl.col(RECORD_CODE_COLUMN).is_in(valid_record_codes)).select(
pl.col(ORIGINAL_ROW_COLUMN), pl.col(ROW_AS_LIST_COLUMN)
)
sort_columns = list()
# Add the sortable columns to the LazyFrame
for sort_column in sort_column_indexes:
column_name = f"column_{sort_column}"
lf = lf.with_columns(
pl.col(ROW_AS_LIST_COLUMN)
.list.get(sort_column)
.alias(column_name)
)
sort_columns.append(column_name)
# Sort the dataframe
lf = lf.sort(sort_columns).select(ORIGINAL_ROW_COLUMN)
# Write the file
lf.sink_csv("output.csv", include_header=False)
The code works fine with small files. But with larger files, polars start consuming A LOT of memory until the python process dies.
Here is the execution output of the code:
>>> lf.sink_csv("output.csv", include_header=False)
RUN STREAMING PIPELINE
[csv -> hstack -> hstack -> filter -> fast_projection -> hstack -> hstack -> sort_multiple -> fast_projection -> parquet_sink]
STREAMING CHUNK SIZE: 600000 rows
OOC sort started
Temporary directory path in use: /data/testes/merger/tmp/
STREAMING CHUNK SIZE: 600000 rows
finished sinking into OOC sort in 539.231142825s
full file dump of OOC sort took 539.631121035s
spill size: 0 mb
processing 1375 files
Killed
I started watching the memory and disk usage on this pod, and this is what I observed:
I already tried to decrease the chunk size in polars configuration (from 600k to 50k). This grants me some more execution time, but the error still occurs.
I also tried converting the sortable fields to INT64, but in this scenario the program dies even before creating all the partial files to sort.
Are there any more configuration parameters that I could fiddle with to optimize memory usage?
Upvotes: 2
Views: 219
Reputation: 21144
I think you'll want to completely avoid using lists.
You can for example use .str.extract()
and extract at index N with regex.
>>> pl.select(pl.lit("01;99900000001;F;1;0;000000321").str.extract(r"[^;]+;([^;]+)")).item()
'99900000001' # index 1
If I swap out the list implementation with this approach:
lf = pl.scan_csv(
"./part_*",
separator=chr(0000),
has_header=False,
new_columns=[ORIGINAL_ROW_COLUMN],
)
lf = lf.filter(
pl.col(ORIGINAL_ROW_COLUMN).str.extract(rf"([^{SEPARATOR}]+)").is_in(valid_record_codes)
)
lf = lf.with_columns(
pl.col(ORIGINAL_ROW_COLUMN).str.extract(
SEPARATOR.join(rf"[^{SEPARATOR}]+" * (index)) # build regex for index N
+
SEPARATOR
+
rf"([^{SEPARATOR}]+)"
)
.alias(f"column_{index}") for index in sort_column_indexes
)
(lf.sort(r"^column_\d+$")
.drop(r"^column_\d+$")
.sink_csv("output.csv", include_header=False)
)
And increase the input file to 20_000_000 lines - it peaks at 5GB of RAM.
The original list version peaks at 10GB of RAM.
It's likely platform/specs dependent, but seems to suggest it could be an improved approach.
Upvotes: 2