Scott Syms
Scott Syms

Reputation: 41

directory globbing with partitioned dask read_parquet directories

I have a directory of partitioned weather station readings that I've written with pandas/pyarrow.

c.to_parquet(path=f"data/{filename}.parquet", engine='pyarrow', compression='snappy', partition_cols=['STATION', 'ELEMENT'])

When I attempt to read a handful of files back with a glob and predicate pushdown clauses like below

ddf= dd.read_parquet("data/*.parquet", engine='pyarrow', gather_statistics=True, filters=[('STATION', '==', 'CA008202251'), ('ELEMENT', '==', 'TAVG')], columns=['TIME','ELEMENT','VALUE', 'STATION'])

I get an index error

IndexError                                Traceback (most recent call last)
<timed exec> in <module>

/usr/local/lib/python3.9/site-packages/dask/dataframe/io/parquet/core.py in read_parquet(path, columns, filters, categories, index, storage_options, engine, gather_statistics, split_row_groups, read_from_paths, chunksize, aggregate_files, **kwargs)
    314         gather_statistics = True
    315 
--> 316     read_metadata_result = engine.read_metadata(
    317         fs,
    318         paths,

/usr/local/lib/python3.9/site-packages/dask/dataframe/io/parquet/arrow.py in read_metadata(cls, fs, paths, categories, index, gather_statistics, filters, split_row_groups, read_from_paths, chunksize, aggregate_files, **kwargs)
    540             split_row_groups,
    541             gather_statistics,
--> 542         ) = cls._gather_metadata(
    543             paths,
    544             fs,

/usr/local/lib/python3.9/site-packages/dask/dataframe/io/parquet/arrow.py in _gather_metadata(cls, paths, fs, split_row_groups, gather_statistics, filters, index, dataset_kwargs)
   1786 
   1787         # Step 1: Create a ParquetDataset object
-> 1788         dataset, base, fns = _get_dataset_object(paths, fs, filters, dataset_kwargs)
   1789         if fns == [None]:
   1790             # This is a single file. No danger in gathering statistics

/usr/local/lib/python3.9/site-packages/dask/dataframe/io/parquet/arrow.py in _get_dataset_object(paths, fs, filters, dataset_kwargs)
   1740         if proxy_metadata:
   1741             dataset.metadata = proxy_metadata
-> 1742     elif fs.isdir(paths[0]):
   1743         # This is a directory.  We can let pyarrow do its thing.
   1744         # Note: In the future, it may be best to avoid listing the

IndexError: list index out of range

I can load the parquet directories individually

ddf= dd.read_parquet("data/2000.parquet", engine='pyarrow', gather_statistics=True, filters=[('STATION', '==', 'CA008202251'), ('ELEMENT', '==', 'TAVG')], columns=['TIME','ELEMENT','VALUE', 'STATION'])

Is globbing possible with dask/parquet/pyarrow reads?

Upvotes: 3

Views: 952

Answers (2)

Powers
Powers

Reputation: 19328

The "data/*.parquet" part is probably what's causing you issues. You need to provide the root paths to your partitioned lake without the *.

Here's an example code snippet that works:

df = pd.DataFrame(
    [
        ["north america", "mexico", "carlos"],
        ["asia", "india", "ram"],
        ["asia", "china", "li"],
    ],
    columns=["continent", "country", "first_name"],
)
ddf = dd.from_pandas(df, npartitions=2)
ddf.to_parquet(
    "tmp/partition/2", engine="pyarrow", partition_on=["continent", "country"]
)

ddf = dd.read_parquet(
    "tmp/partition/2",
    engine="pyarrow",
    filters=[("continent", "==", "asia"), ("country", "==", "china")],
)

Notice that read_parquet is being called on "tmp/partition/2", not a directory with an asterisk.

Upvotes: 1

SultanOrazbayev
SultanOrazbayev

Reputation: 16581

When using partition_cols in .to_parquet, the partitioned dataframes are saved in separate files, so data/2000.parquet in your case is likely a folder.

import pandas as pd
from os.path import isdir

# test dataframe
df = pd.DataFrame(range(3), columns=['a'])
df['b'] = df['a']
df['c'] = df['a']

# save without partitioning
df.to_parquet('test.parquet')
print(isdir('test.parquet')) # False

# save with partitioning
df.to_parquet('test_partitioned.parquet', partition_cols=['a', 'b'])
print(isdir('test_partitioned.parquet')) # True

As a way out of this, it might be a good solution to construct an explicit list of parquet files with os.walk or glob. Note that if there are multiple partition columns, then there will be multiple nested folders with parquet files, so a simple glob will not be sufficient and you will want to do a recursive search.

Alternatively, one can construct dask.dataframes for each year and then concatenate them with dd.concat.

Upvotes: 2

Related Questions