Reputation: 408
I have a large dataset of daily files located at /some/data/{YYYYMMDD}.parquet
(or can also be smth like /some/data/{YYYY}/{MM}/{YYYYMMDD}.parquet
).
I describe data source in mycat.yaml file as follows:
sources:
source_paritioned:
args:
engine: pyarrow
urlpath: "/some/data/*.parquet"
description: ''
driver: intake_parquet.source.ParquetSource
I want to be able to read a subset of files (partitions) into memory,
If I run source = intake.open_catalog('mycat.yaml').source_partitioned; print(source.npartitions)
I get 0
. Probably because the partition information is not yet initialized. After source.discover()
, source.npartitions
is updated to 1726
which is exactly the number of individual files on disk.
How would I load data:
If this is described somewhere on the wiki, feel free to point me to the appropriate section.
Note: after thinking a little more, I realized this might be related to functionality of dask and probably my goal can be somehow achieved by converting the source to dask_dataframe with .to_dask
method. Therefore putting dask
label on this question.
Upvotes: 2
Views: 373
Reputation: 16581
This is a follow-up on a comment to my previous answer.
If the parquet files are indexed by (nonoverlapping) time, then dask will not need to read every file into memory (dask will read only the metadata of each file). The metadata for all files will be loaded, but only the relevant files will be loaded in memory:
from dask.datasets import timeseries
# this df will have 30 partitions
df = timeseries()
# this query will only work with 1 partition
df.loc["2000-01-03"]
This can be useful if the downstream workflow operates with different subsets of a large dataframe, but which subsets are needed is changed dynamically. So the fixed cost of creating a large dask dataframe (using metadata only) is incurred once, and then dask is responsible for selecting subsets of the data needed.
If the parquet files are not indexed by time and the time information is only in the filename, then dask will not parse the information from the filename. In this case, some of the options are:
writing a custom-loader function that will filter the required filenames and pass them to dask. This can reduce the fixed cost of creating the dask dataframe and is useful when it is known which subset of overall data is needed;
using intake
as per previous answer.
Upvotes: 1
Reputation: 16581
There are at least two approaches:
continue with the current approach of loading everything into dask (using *
) and then subset to the required range.
load only a specific subset of the data.
For option 2, the parameters
option of intake
is handy. So, assuming that paths are /some/data/{YYYYMMDD}.parquet
, the modified catalog entry would look like this:
sources:
source_partitioned:
parameter:
date:
type: str
default: "*"
args:
engine: pyarrow
urlpath: "/some/data/{{ date }}.parquet"
description: ''
driver: intake_parquet.source.ParquetSource
In Python, the parameter date can be provided (as 'str' in this case) using source = intake.open_catalog('mycat.yaml').source_partitioned(date='20211101')
to load a specific date.
For date ranges, things are a bit trickier, because one way would be to create some list comprehension using desired range and then concatenate the files loaded individually, but that might be not efficient for large date ranges. In those cases I would load bigger chunks, e.g. by year using date="2017*"
, and concatenate these larger chunks afterwards.
Upvotes: 2