Reputation: 1549
Say I have a table called data
and it's some time-series. It's stored like this:
/data
/date=2022-11-30
/region=usa
part-000001.parquet
part-000002.parquet
Where I have two partition keys and two partitions for the parquet files. I can easily list the files for the partitions keys with:
dbfs.fs.ls('/data/date=2022-11-30/region=usa')
But, if I now make an update to the table, it regenerates the parquet files and now I have 4 files
in that directory.
How can I retrieve the latest version
of the parquet files? Do I really have to loop through all the _delta_log
state files and rebuild the state? Or do I have to run VACCUM
to cleanup the old versions so I can get the most recent files?
There has to be a magic function.
Upvotes: 1
Views: 2600
Reputation: 12672
As Denny pointed out, if you just want to read the latest version of the table, then open it and Delta will automatically take care of figuring out what Parquet files to read based on the transaction log.
But to answer your question directly, say you really want to know what files are "active" for the latest version of the table (or any particular version, really).
There are two ways to do this without parsing the transaction log yourself.
The first method is using the built-in DataFrame method inputFiles()
:
data = (
spark.read
.format("delta")
# Optionally, load a prior version of the table.
# .option("versionAsOf", 0)
.load("/some/path/")
)
data.inputFiles()
This will return a list of paths for the files that make up the DataFrame as of the version you loaded:
[
'/some/path/part-00230-12fe099b-ad58-41dd-a5ca-bdb364534889.c000.snappy.parquet',
'/some/path/part-00095-18ee8194-19f7-4162-a055-53b040789042.c000.snappy.parquet',
...
]
Note that the docstring for this method states:
Returns a best-effort snapshot of the files that compose this DataFrame. ... Depending on the source relations, this may not find all input files.
However, for Delta tables, I believe this should be able to find all input files. The caveat applies more to other types of data sources that may not map neatly to files on a filesystem, like JDBC.
Another method is to use the file_uris()
method of the delta-rs project to do this.
import deltalake
table = deltalake.DeltaTable(
"/some/path",
# Optionally specify a table version to read.
# version=0,
)
table.file_uris()
Like with the inputFiles()
method, this will return a list of the Parquet files comprising the Delta table at the version you loaded:
[
'/some/path/part-00230-12fe099b-ad58-41dd-a5ca-bdb364534889.c000.snappy.parquet',
'/some/path/part-00095-18ee8194-19f7-4162-a055-53b040789042.c000.snappy.parquet',
...
]
Upvotes: 3
Reputation: 3254
Delta Lake itself tracks all of this information in its transaction log. When you query a Delta table with an engine or API that supports Delta Lake, underneath the covers it is reading this transaction log to determine what files make up that version of the table.
For your example, say the four files are:
/data
/date=2022-11-30
/region=usa
part-000001.parquet
part-000002.parquet
part-000003.parquet
part-000004.parquet
The Delta transaction log itself contains the path of the files for each table version, e.g.:
# VO | first version of the table
/data
/date=2022-11-30
/region=usa
part-000001.parquet
part-000002.parquet
# V1 | second version of the table
/data
/date=2022-11-30
/region=usa
part-000003.parquet
part-000004.parquet
You can use Delta Standalone if you want to use the Scala/JVM to get the list of files and/or Delta Rust to use the Delta Rust and/or Python bindings.
If you would like to do it in Spark SQL and/or dive into the details on this, please check out Diving into Delta Lake: Unpacking the Transaction Log which includes video, blog, and notebook on this topic. There is also a follow up video called Under the sediments v2.
Upvotes: 3