yoyoyoyo123
yoyoyoyo123

Reputation: 2472

Polars scan s3 multi-part parquet files

I have a multipart partitioned parquet on s3. Each partition contains multiple parquet files. The below code narrows in on a single partition which may contain somewhere around 30 parquet files. When I use scan_parquet on a s3 address that includes *.parquet wildcard, it only looks at the first file in the partition. I verified this with the count of customers. It has the count from just the first file in the partition. Is there a way that it can scan across files?

import polars as pl

s3_loc = "s3://some_bucket/some_parquet/some_partion=123/*.parquet"
df = pl.scan_parquet(s3_loc)
cus_count = df.select(pl.count('customers')).collect()

If I leave off the *.parquet from the s3 address then I get the following error.

exceptions.ArrowErrorException: ExternalFormat("File out of specification: A parquet file must containt a header and footer with at least 12 bytes")

Upvotes: 6

Views: 11356

Answers (2)

Dean MacGregor
Dean MacGregor

Reputation: 18691

New Answer

polars can natively load files from AWS, Azure, GCP, or plain old http and no longer uses fsspec (very much, if at all). Instead, it uses the object_store under the hood. The syntax to use it is.

pl.scan_parquet(
    "s3://some_bucket/some_parquet/some_partion=123/*.parquet", 
    storage_options= dict_of_credentials)

The dict_of_credentials might be a bit different than what you normally feed to initiate an s3fs.S3FileSystem().

Additionally, you can exclude storage_options if you've got your credentials as env variables but the env variables that work for s3fs might not be exactly what object_store is looking for. For instance, this shows what env variables it will look for.

Old Answer using pyarrow.dataset

It looks like from the user guide on multiple files that to do so requires a loop creating many lazy dfs that you then combine together.

Another approach is to use the scan_ds function which takes a pyarrow dataset object.

import polars as pl
import s3fs
import pyarrow.dataset as ds
fs = s3fs.S3FileSystem()
# you can also make a file system with anything fsspec supports
# S3FileSystem is just a wrapper for fsspec
s3_loc = "s3://some_bucket/some_parquet/some_partion=123"
myds = ds.dataset(s3_loc, filesystem=fs)
lazy_df = pl.scan_pyarrow_dataset(myds)
cus_count = lazy_df.select(pl.count('customers')).collect()

Upvotes: 6

Filippo Vitale
Filippo Vitale

Reputation: 8113

The current 0.20.7 version of polars simply works

  • eagerly with pl.read_parquet(source) or
  • lazily with pl.scan_parquet(source)

Where source = "s3://bucket/*.parquet" source


If the AWS credentials are not available as environment variable, it is possible to pass a storage_options argument like:

storage_options = {
    "aws_access_key_id": "<secret>",
    "aws_secret_access_key": "<secret>",
    "aws_region": "us-east-1",
}
df = pl.scan_parquet(source, storage_options=storage_options)

Furthermore if the parquet files are saved with Hive partitions as S3 directories, it is now possible to leaverage PyArrow for it:

import pyarrow.dataset as ds

pl.scan_pyarrow_dataset(
    ds.dataset("s3://my-partitioned-folder/", format="parquet")
)

Upvotes: 2

Related Questions