Memory issues sorting larger than memory file with polars

everyone.

I'm struggling with memory issues while sorting a larger than memory LazyFrame with polars, using the skin_csv command.

I'm developing an engine that should be able to merge and sort multiple large datasets, that comes in the form of distinct CSV files.

I'm working with two 6GB files to merge and sort. I started with a pod of 8GB of RAM, but the application crashes even with 32GB.

Note: Those files are not "conventional" csv files, in a sense that they have custom "header" and "trailer" records on its composition. Each row is identified by a "record code" (the first 2 digits). Diferent record types might have diferent number of fields, therefore I cannot simply read the whole file with a plain "scan_csv" + separator char.

Here is an example of one of those files:

00;XPTO;99999991000197;20240905;130444;000001;20240905;130444;000001;20240901;0000
01;99900000001;F;1;0;000000321
01;00000000001;F;2;0;000000123
01;77700000003;F;0;0;000000999
01;22200000004;F;0;0;000000999
01;12300000004;F;0;0;000000999
99;00000000005;

What my code does:

  1. Create a LazyFrame with all the parts to be merged
  2. Create a column with the original line "as-is", since I shouldn't change its formating or contents in any way on the output file.
  3. Create a new column spliting the original line in a list using a provided separator (;).
  4. Filter undesired record codes from the dataframe (by filtering the value of the first field of the list)
  5. Create new columns for the "sortable" fields, by reading the index on the "list" column.
  6. Sort the dataframe into a new file with pl.sink_csv
import polars as pl
import os

# Polars tmp dir should be set to /data since /tmp does not have enough space
os.environ["POLARS_TEMP_DIR"] = "/data/testes/merger/tmp/"
pl.Config.set_verbose(True)  

valid_record_codes = ["01", "02", "05", "06"]
sort_column_indexes = [1, 0]

ORIGINAL_ROW_COLUMN = "original_row"
ROW_AS_LIST_COLUMN = "row_as_list"
RECORD_CODE_COLUMN = "record_code"
SEPARATOR = ";"

# Read the input files
lf = pl.scan_csv(
    "./part_*",
    separator=chr(0000),
    has_header=False,
    new_columns=[ORIGINAL_ROW_COLUMN],
)

# Add a column on the dataframe for the record_code. 
# As for now, the record code is always the first field of the dataframe
lf = lf.select(ORIGINAL_ROW_COLUMN).with_columns(
    pl.col(ORIGINAL_ROW_COLUMN).str.split(SEPARATOR).alias(ROW_AS_LIST_COLUMN)
)

# Eliminate undesired records from the dataframe
lf = lf.with_columns(
    pl.col(ROW_AS_LIST_COLUMN).list.get(0).alias(RECORD_CODE_COLUMN)
)



lf = lf.filter(pl.col(RECORD_CODE_COLUMN).is_in(valid_record_codes)).select(
    pl.col(ORIGINAL_ROW_COLUMN), pl.col(ROW_AS_LIST_COLUMN)
)

sort_columns = list()

# Add the sortable columns to the LazyFrame
for sort_column in sort_column_indexes:
    column_name = f"column_{sort_column}"
    lf = lf.with_columns(
            pl.col(ROW_AS_LIST_COLUMN)
            .list.get(sort_column)
            .alias(column_name)
        )
    sort_columns.append(column_name)

# Sort the dataframe
lf = lf.sort(sort_columns).select(ORIGINAL_ROW_COLUMN)

# Write the file
lf.sink_csv("output.csv", include_header=False)

The code works fine with small files. But with larger files, polars start consuming A LOT of memory until the python process dies.

Here is the execution output of the code:

>>> lf.sink_csv("output.csv", include_header=False)
RUN STREAMING PIPELINE
[csv -> hstack -> hstack -> filter -> fast_projection -> hstack -> hstack -> sort_multiple -> fast_projection -> parquet_sink]
STREAMING CHUNK SIZE: 600000 rows
OOC sort started
Temporary directory path in use: /data/testes/merger/tmp/
STREAMING CHUNK SIZE: 600000 rows
finished sinking into OOC sort in 539.231142825s
full file dump of OOC sort took 539.631121035s
spill size: 0 mb
processing 1375 files
Killed

I started watching the memory and disk usage on this pod, and this is what I observed:

I already tried to decrease the chunk size in polars configuration (from 600k to 50k). This grants me some more execution time, but the error still occurs.

I also tried converting the sortable fields to INT64, but in this scenario the program dies even before creating all the partial files to sort.

Are there any more configuration parameters that I could fiddle with to optimize memory usage?

Upvotes: 2

Views: 219

Answers (1)

jqurious
jqurious

Reputation: 21144

I think you'll want to completely avoid using lists.

You can for example use .str.extract() and extract at index N with regex.

>>> pl.select(pl.lit("01;99900000001;F;1;0;000000321").str.extract(r"[^;]+;([^;]+)")).item()
'99900000001' # index 1

If I swap out the list implementation with this approach:

lf = pl.scan_csv(
    "./part_*",
    separator=chr(0000),
    has_header=False,
    new_columns=[ORIGINAL_ROW_COLUMN],
)

lf = lf.filter(
    pl.col(ORIGINAL_ROW_COLUMN).str.extract(rf"([^{SEPARATOR}]+)").is_in(valid_record_codes)
)

lf = lf.with_columns(
    pl.col(ORIGINAL_ROW_COLUMN).str.extract(
        SEPARATOR.join(rf"[^{SEPARATOR}]+" * (index)) # build regex for index N
        + 
        SEPARATOR
        + 
        rf"([^{SEPARATOR}]+)"
    )
    .alias(f"column_{index}") for index in sort_column_indexes
)

(lf.sort(r"^column_\d+$")
   .drop(r"^column_\d+$")
   .sink_csv("output.csv", include_header=False)
)

And increase the input file to 20_000_000 lines - it peaks at 5GB of RAM.

The original list version peaks at 10GB of RAM.

It's likely platform/specs dependent, but seems to suggest it could be an improved approach.

Upvotes: 2

Related Questions