Kavya shree
Kavya shree

Reputation: 448

Reading partitioned multi-schema parquet files from S3 using Polars

Having 1000+ s3 file in partitioned path, want to read all the files. using Polars because it is fast when compared to Pandas

s3://bucket_name/rs_tables/name='part1'/key='abc'/date=''/part1_0000.parquet

Scanning these files using Polars

    source = "s3://bucket_name/rs_tables/*/*/*/*.parquet"
    storage_options = {
        "aws_access_key_id": access_key,
        "aws_secret_access_key": secret_key,
        "aws_session_token": token
    }
    
    lazyFrame = pl.scan_parquet(source, storage_options=storage_options)
    lazyFrame.collect()

Since these files have different schema, code is throwing compute error

ComputeError: schema of all files in a single scan_parquet must be equal

Is there any option of mergeSchema like in Spark? Please suggest solutions to solve this problem

Upvotes: 4

Views: 798

Answers (1)

Dean MacGregor
Dean MacGregor

Reputation: 18691

Unfortunately scan_parquet doesn't have that option.

The pl.concat method does allow for a vertical relaxed combination of frames so you can use that.

There are two steps to this workaround.

The first is to get a list of files which requires json parsing then, second, use a list comprehension to scan_parquet on all the files individually and wrap that in pl.concat with how=vertical_relaxed

source = "s3://bucket_name/rs_tables/*/*/*/*.parquet"
storage_options = {
    "aws_access_key_id": access_key,
    "aws_secret_access_key": secret_key,
    "aws_session_token": token
}
import json
file_paths = json.loads(
    pl.scan_parquet(
        source, storage_options=storage_options
        ).serialize()
    )['Scan']['paths']

lazyframe = pl.concat([
    pl.scan_parquet(x, storage_options=storage_options)
    for x in file_paths
], how='vertical_relaxed')

If the schemas are more different than what vertical_relaxed can correct for then you may need to manually employ a select or with_columns (the former is better so that all columns are explicit) with casts and transformations in the concat. In doing that you would no longer need to use how='vertical_relaxed'. A made up example might look like this:

lazyframe = pl.concat([
    pl.scan_parquet(x, storage_options=storage_options)
    .select(
        name="name",
        key="key",
        date=pl.col("date").str.strptime(pl.Datetime, "%Y-%m-%d"),
        var1=pl.col('var1').cast(pl.Float64)
    )
    for x in file_paths
])

I made this enhancement request to give scan_parquet the option so it's easier to do.

Upvotes: 4

Related Questions