Jeroen Bos
Jeroen Bos

Reputation: 97

PyArrow: read single file from partitioned parquet dataset is unexpectedly slow

I am having some problems with the speed of loading .parquet files. However, I don't know what I am doing wrong.

Problem

I am trying to read a single .parquet file from from my local filesystem which is the partitioned output from a spark job. Such that there are .parquet files in hierarchical directories named a=x and b=y.

To achieve this, I am using pandas.read_parquet (which uses pyarrow.parquet.read_table) for which I include the filters kwarg. The run time of using the filters is way longer than I would expect.

# The following runs for about 55 seconds
pd.read_parquet(<path_to_entire_dataset>, filters=[[('a', '=', 'x'), ('b', '=', 'y')]])

# The following runs for about 0.04 seconds
pd.read_parquet(<path_to_entire_dataset>/a=x/b=y/)

# The following runs for about 70 seconds
pd.read_parquet(<path_to_entire_dataset>)

Reading a single parquet file by specifying filters is only slightly faster than loading the entire dataset, where I would expect a run time approximately linear in the amount of files.

What mistake do I make here?

I realize that simply putting the filters in the path would work, however this will quickly become complex as what I want to filter on will / can change. Besides, I think read_table should be able to load this data efficiently.

PS: The entire dataset contains many millions of rows, the data I want to load is only a few thousand rows.

Edit 1:

As suggested by 0x26res I manually defined the partitioning, this lead to a significant speed up, but still not as much as I would have expected. In this situation the run time was about 5 seconds.

partitioning = HivePartitioning(
    pa.schema([
        pa.field('a', pa.string()),
        pa.field('b', pa.int32()),
    ])
)

pd.read_parquet(
    <path_to_entire_dataset>,
    engine='pyarrow',
    filters=[
        [
            ('a', '=', x),
            ('b', '=', y),
        ]
    ],
    partitioning=partitioning
)

Upvotes: 3

Views: 2544

Answers (1)

0x26res
0x26res

Reputation: 13952

Given the run time, I suspect arrow is opening every files and then filtering.

Maybe you can try specifiying the partitioning so arrow can be smarter about it:

import pyarrow as pa

partitioning = pa.dataset.HivePartitioning(
        pa.schema([
            pa.field('a', pa.string()),
            pa.field('b', pa.string())
        ])
    )

pd.read_parquet(<path_to_entire_dataset>, filters=[[('a', '=', 'x'), ('b', '=', 'y')]], partitioning=partitioning)

Upvotes: 1

Related Questions