Reputation: 23
I have a parquet file in s3 to which I will be automatically appending additional data every week. The data has timestamps at 5-minute intervals. I do not want to append any duplicate data during my updates, so what I am trying to accomplish is read ONLY the max/oldest timestamp within the data saved in s3. Then, I will make sure that all of the timestamps in the data I will be appending are older than that time before appending. I don't want to read the entire dataset from s3 in an effort to increase speed/preserve memory as the dataset continues to grow.
Here is an example of what I am doing now to read the entire file:
from pyarrow import fs
import pyarrow.parquet as pq
s3, path = fs.S3FileSystem(access_key, secret_key).from_uri(uri)
dataset = pq.ParquetDataset(path, filesystem=s3)
table = dataset.read()
But I am looking for something more like this (I am aware this isn't correct, but hopefully it conveys what I am attempting to accomplish):
max_date = pq.ParquetFile(path, filesystem=s3).metadata.row_group(0).column('timestamp').statistics['max']
I am pretty new to using both Pyarrow and AWS, so any help would be fantastic (including alternate solutions to my problem I described).
Upvotes: 1
Views: 2821
Reputation: 43817
From a purely pedantic perspective I would phrase the problem statement a little differently as "I have a parquet dataset in S3 and will be appending new parquet files on a regular basis". I only mention that because the pyarrow documentation is written with that terminology in mind (e.g. you cannot append to a parquet file with pyarrow but you can append to a parquet dataset) and so it might help understanding.
The pyarrow datasets API doesn't have any operations to retrieve dataset statistics today (it might not be a bad idea to request the feature as a JIRA). However, it can help a little in finding your fragments. What you have doesn't seem that far off to me.
s3, path = fs.S3FileSystem(access_key, secret_key).from_uri(uri)
# At this point a call will be made to S3 to list all the files
# in the directory 'path'
dataset = pq.ParquetDataset(path, filesystem=s3)
max_timestamp = None
for fragment in dataset.get_fragments():
field_index = fragment.physical_schema.get_field_index('timestamp')
# This will issue a call to S3 to load the metadata
metadata = fragment.metadata
for row_group_index in range(metadata.num_row_groups):
stats = metadata.row_group(row_group_index).column(field_index).statistics
# Parquet files can be created without statistics
if stats:
row_group_max = stats.max
if max_timestamp is None or row_group_max > max_timestamp:
max_timestamp = row_group_max
print(f"The maximum timestamp was {max_timestamp}")
I've annotated the places where actual calls to S3 will be made. This will certainly be faster than loading all of the data but there is still going to be some overhead which will grow as you add more files. This overhead could get quite high if you are running outside of the AWS region. You could mitigate this by scanning the fragments in parallel but that will be extra work.
It would be faster to store the max_timestamp in a dedicated statistics file whenever you update the the data in your dataset. That way there is only ever one small file you need to read. If you're managing the writes yourself you might look into a table format like Apache Iceberg which is a standard format for storing this kind of extra information and statistics about a dataset (what Arrow calls a "dataset" Iceberg calls a "table").
Upvotes: 1