shadowtalker
shadowtalker

Reputation: 13903

Polars LazyFrame streaming directly to (partitioned) Parquet without collecting

I am reading some data from AWS S3 with polars.scan_pyarrow_dataset. When I am finished with my data processing, I would like to write the results back to cloud storage, in partitioned Parquet files. However, I'd like to avoid collecting all results in memory. I believe this should be possible with the Parquet format because of its support for a row groups, as well as the ability to split the data into physically separate files.

I don't see any support for this in the Polars LazyFrame API. I see that collect has streaming=True, but that still looks like it collects the final result in memory.

Is there some way to achieve this that I am not seeing?

Upvotes: 2

Views: 2528

Answers (2)

user13800521
user13800521

Reputation: 1

I think your best bet will be to determine how you are partitioning your pyarrow dataset, and iterate through those partitions. So that would mean creating the lazyframe with the full query. Then iterating through unique values of your partition column, collecting that smaller dataframe, and writing it as a pyarrow partitioned dataset.

df: pl.LazyFrame = ...
for year in full_dataset_years:
    ds.write_dataset(
        df.filter(pl.col(year) == pl.lit(year)).collect().to_arrow(),
        ...
 

Upvotes: 0

Dean MacGregor
Dean MacGregor

Reputation: 18691

polars doesn't yet have a partitioned writer. Its streaming engine also doesn't support pyarrow_datasets.

It does support S3 files directly though so unless you're working under some special case, you can skip the scan_pyarrow_dataset and use the built in cloud reader.

See the scan_parquet docs. If you give it the appropriate storage_options then you can read a hive partition directly. One difference is that, in contrast, to this syntax ds.dataset("/path/to/root_directory", ...) you'd do pl.scan_parquet("s3://path/to/root_directory/**/**/*.parquet", ...)

Given a LazyFrame that was generated that way, you will have streaming support. sink_parquet doesn't have cloud support though. You could use sink_parquet to a local file and then use the pyarrow dataset writer to partition it to its ultimate destination

For example:

df=pl.scan_parquet("s3://path/to/root_directory/**/**/*.parquet", 
                   storage_options=your_options_dict)

df = df#all your data processing

df.sink_parquet("some_local.parquet")

sourcepq=ds.dataset("some_local.parquet").scanner()

s3_filesystem = s3fs(...)

ds.write_dataset(sourcepq, 
             "/destination/path/on/s3", 
             filesystem=s3_filesystem,
             partitioning = ['col1','col2'], 
             partitioning_flavor='hive')

Upvotes: 6

Related Questions