Reputation: 448
Having 1000+ s3 file in partitioned path, want to read all the files. using Polars because it is fast when compared to Pandas
s3://bucket_name/rs_tables/name='part1'/key='abc'/date=''/part1_0000.parquet
Scanning these files using Polars
source = "s3://bucket_name/rs_tables/*/*/*/*.parquet"
storage_options = {
"aws_access_key_id": access_key,
"aws_secret_access_key": secret_key,
"aws_session_token": token
}
lazyFrame = pl.scan_parquet(source, storage_options=storage_options)
lazyFrame.collect()
Since these files have different schema, code is throwing compute error
ComputeError: schema of all files in a single scan_parquet must be equal
Is there any option of mergeSchema like in Spark? Please suggest solutions to solve this problem
Upvotes: 4
Views: 798
Reputation: 18691
Unfortunately scan_parquet
doesn't have that option.
The pl.concat
method does allow for a vertical relaxed combination of frames so you can use that.
There are two steps to this workaround.
The first is to get a list of files which requires json parsing then, second, use a list comprehension to scan_parquet
on all the files individually and wrap that in pl.concat
with how=vertical_relaxed
source = "s3://bucket_name/rs_tables/*/*/*/*.parquet"
storage_options = {
"aws_access_key_id": access_key,
"aws_secret_access_key": secret_key,
"aws_session_token": token
}
import json
file_paths = json.loads(
pl.scan_parquet(
source, storage_options=storage_options
).serialize()
)['Scan']['paths']
lazyframe = pl.concat([
pl.scan_parquet(x, storage_options=storage_options)
for x in file_paths
], how='vertical_relaxed')
If the schemas are more different than what vertical_relaxed
can correct for then you may need to manually employ a select
or with_columns
(the former is better so that all columns are explicit) with casts and transformations in the concat
. In doing that you would no longer need to use how='vertical_relaxed'. A made up example might look like this:
lazyframe = pl.concat([
pl.scan_parquet(x, storage_options=storage_options)
.select(
name="name",
key="key",
date=pl.col("date").str.strptime(pl.Datetime, "%Y-%m-%d"),
var1=pl.col('var1').cast(pl.Float64)
)
for x in file_paths
])
I made this enhancement request to give scan_parquet the option so it's easier to do.
Upvotes: 4