Reputation: 41
I have a directory of partitioned weather station readings that I've written with pandas
/pyarrow
.
c.to_parquet(path=f"data/{filename}.parquet", engine='pyarrow', compression='snappy', partition_cols=['STATION', 'ELEMENT'])
When I attempt to read a handful of files back with a glob and predicate pushdown clauses like below
ddf= dd.read_parquet("data/*.parquet", engine='pyarrow', gather_statistics=True, filters=[('STATION', '==', 'CA008202251'), ('ELEMENT', '==', 'TAVG')], columns=['TIME','ELEMENT','VALUE', 'STATION'])
I get an index error
IndexError Traceback (most recent call last)
<timed exec> in <module>
/usr/local/lib/python3.9/site-packages/dask/dataframe/io/parquet/core.py in read_parquet(path, columns, filters, categories, index, storage_options, engine, gather_statistics, split_row_groups, read_from_paths, chunksize, aggregate_files, **kwargs)
314 gather_statistics = True
315
--> 316 read_metadata_result = engine.read_metadata(
317 fs,
318 paths,
/usr/local/lib/python3.9/site-packages/dask/dataframe/io/parquet/arrow.py in read_metadata(cls, fs, paths, categories, index, gather_statistics, filters, split_row_groups, read_from_paths, chunksize, aggregate_files, **kwargs)
540 split_row_groups,
541 gather_statistics,
--> 542 ) = cls._gather_metadata(
543 paths,
544 fs,
/usr/local/lib/python3.9/site-packages/dask/dataframe/io/parquet/arrow.py in _gather_metadata(cls, paths, fs, split_row_groups, gather_statistics, filters, index, dataset_kwargs)
1786
1787 # Step 1: Create a ParquetDataset object
-> 1788 dataset, base, fns = _get_dataset_object(paths, fs, filters, dataset_kwargs)
1789 if fns == [None]:
1790 # This is a single file. No danger in gathering statistics
/usr/local/lib/python3.9/site-packages/dask/dataframe/io/parquet/arrow.py in _get_dataset_object(paths, fs, filters, dataset_kwargs)
1740 if proxy_metadata:
1741 dataset.metadata = proxy_metadata
-> 1742 elif fs.isdir(paths[0]):
1743 # This is a directory. We can let pyarrow do its thing.
1744 # Note: In the future, it may be best to avoid listing the
IndexError: list index out of range
I can load the parquet directories individually
ddf= dd.read_parquet("data/2000.parquet", engine='pyarrow', gather_statistics=True, filters=[('STATION', '==', 'CA008202251'), ('ELEMENT', '==', 'TAVG')], columns=['TIME','ELEMENT','VALUE', 'STATION'])
Is globbing possible with dask
/parquet
/pyarrow
reads?
Upvotes: 3
Views: 952
Reputation: 19328
The "data/*.parquet"
part is probably what's causing you issues. You need to provide the root paths to your partitioned lake without the *
.
Here's an example code snippet that works:
df = pd.DataFrame(
[
["north america", "mexico", "carlos"],
["asia", "india", "ram"],
["asia", "china", "li"],
],
columns=["continent", "country", "first_name"],
)
ddf = dd.from_pandas(df, npartitions=2)
ddf.to_parquet(
"tmp/partition/2", engine="pyarrow", partition_on=["continent", "country"]
)
ddf = dd.read_parquet(
"tmp/partition/2",
engine="pyarrow",
filters=[("continent", "==", "asia"), ("country", "==", "china")],
)
Notice that read_parquet
is being called on "tmp/partition/2"
, not a directory with an asterisk.
Upvotes: 1
Reputation: 16581
When using partition_cols
in .to_parquet
, the partitioned dataframes are saved in separate files, so data/2000.parquet
in your case is likely a folder.
import pandas as pd
from os.path import isdir
# test dataframe
df = pd.DataFrame(range(3), columns=['a'])
df['b'] = df['a']
df['c'] = df['a']
# save without partitioning
df.to_parquet('test.parquet')
print(isdir('test.parquet')) # False
# save with partitioning
df.to_parquet('test_partitioned.parquet', partition_cols=['a', 'b'])
print(isdir('test_partitioned.parquet')) # True
As a way out of this, it might be a good solution to construct an explicit list of parquet files with os.walk
or glob
. Note that if there are multiple partition columns, then there will be multiple nested folders with parquet files, so a simple glob will not be sufficient and you will want to do a recursive search.
Alternatively, one can construct dask.dataframes
for each year and then concatenate them with dd.concat
.
Upvotes: 2