Reputation: 2736
Using .vacuum()
on a DeltaLake table is very slow (see Delta Lake (OSS) Table on EMR and S3 - Vacuum takes a long time with no jobs).
If I manually deleted the underlying parquet files and did not add a new json
log file or add a new .checkpoint.parquet
file and change the _delta_log/_last_checkpoint
file that points to it; what would the negative impacts to the DeltaLake table be, if any?
Obviously time-traveling, i.e. loading a previous version of the table that relied on the parquet files I removed, would not work. What I want to know is, would there be any issues reading, writing, or appending to the current version of the DeltaLake table?
What I am thinking of doing in pySpark:
### Assuming a working SparkSession as `spark`
from subprocess import check_output
import json
from pyspark.sql import functions as F
awscmd = "aws s3 cp s3://my_s3_bucket/delta/_delta_log/_last_checkpoint -"
last_checkpoint = str(json.loads(check_output(awscmd, shell=True).decode("utf-8")).get('version')).zfill(20)
s3_bucket_path = "s3a://my_s3_bucket/delta/"
df_chkpt_del = (
spark.read.format("parquet")
.load(f"{s3_bucket_path}/_delta_log/{last_checkpoint}.checkpoint.parquet")
.where(F.col("remove").isNotNull())
.select("remove.*")
.withColumn("deletionTimestamp", F.from_unixtime(F.col("deletionTimestamp")/1000))
.withColumn("delDateDiffDays", F.datediff(F.col("deletionTimestamp"), F.current_timestamp()))
.where(F.col("delDateDiffDays") < -7 )
)
There are a lot of options from here. One could be:
df_chkpt_del.select("path").toPandas().to_csv("files_to_delete.csv", index=False)
Where I could read files_to_delete.csv
into a bash array and then use a simple bash for
loop passing each parquet file s3 path to an aws s3 rm
command to remove the files one by one.
This may be slower than vacuum()
, but at least it will not be consuming cluster resources while it is working.
If I do this, will I also have to either:
_delta_log/000000000000000#####.json
file that correctly documents these changes?000000000000000#####.checkpoint.parquet
file that correctly documents these changes and change the _delta_log/_last_checkpoint
file to point to that checkpoint.parquet
file?The second option would be easier.
However, if there will be no negative effects if I just remove the files and don't change anything in the _delta_log
, then that would be the easiest.
Upvotes: 1
Views: 2053
Reputation: 451
TLDR. Answering this question.
If I manually deleted the underlying parquet files and did not add a new json log file or add a new .checkpoint.parquet file and change the _delta_log/_last_checkpoint file that points to it; what would the negative impacts to the DeltaLake table be, if any?
Yes, this could potentially corrupt your delta table.
Let me briefly answers how delta-lake reads a version using _delta_log
.
If you want to read version x
then it will go to delta log of all versions from 1
to x-1
and will make a running sum of parquet files to read. Summary of this process is saved as a .checkpoint
after every 10th version to make this process of running sum efficient.
Assume,
version 1 log says, add add file_1, file_2, file_3
version 2 log says, add delete file_1, file_2, and add file_4
So when reading version no 2, total instruction will be
add file_1, file_2, file_3 -> delete file_1, file_2, and add file_4
So, resultant files read will be file_3 and file_4.
Say in version 3, you delete file_4
from file system. If you don't use .vacuum
then delta log will not know that file_4
is not present, it will try to read it and will fail.
Upvotes: 0