william_grisaitis
william_grisaitis

Reputation: 5951

How do I ensure that dask doesn't read unnecessary files from disk when querying a partitioned dataframe?

For example, say I have a parquet dataset in a directory tree, hive-style:

dataset/field1=A/field2=X/data.parquet
dataset/field1=A/field2=Y/data.parquet
dataset/field1=B/field2=X/data.parquet
dataset/field1=B/field2=Y/data.parquet

And say I define a dask dataframe like this:

import dask.dataframe as dd

df = dd.read_parquet("dataset/", engine="pyarrow")

Here, field1 and field2 are partition columns in my dataframe.

My question: how do I make sure dask only reads one parquet file if I do df.query("field1 == 'A' and field2 == 'X'")? (i.e., only read dataset/field1=A/field2=X/data.parquet)

Relatedly, is there a way of knowing which files dask is opening and reading? Maybe from a logger?

Upvotes: 1

Views: 307

Answers (1)

SultanOrazbayev
SultanOrazbayev

Reputation: 16581

This functionality is supported by pyarrow when using filters at read time (not as .query method):

from dask.dataframe import read_parquet
from dask.datasets import timeseries

df = timeseries(end="2000-01-03", seed=0).reset_index()
df["date"] = df["timestamp"].dt.date.astype("str")
df.to_parquet("test.pqt", partition_on=["id", "date"])
rare_id = df["id"].value_counts().compute().tail(1).index.values[0]

ddf = read_parquet("test.pqt", filters=[[("id", "==", rare_id)]], engine="pyarrow")
print(ddf.npartitions)  # 1

Edit: the snippet above works with dask=2022.1.0, but not dask=2022.6.1, for details check this GH issue.

Upvotes: 2

Related Questions