Clay
Clay

Reputation: 2736

Can underlying parquet files be deleted without negatively impacting DeltaLake _delta_log

Using .vacuum() on a DeltaLake table is very slow (see Delta Lake (OSS) Table on EMR and S3 - Vacuum takes a long time with no jobs).

If I manually deleted the underlying parquet files and did not add a new json log file or add a new .checkpoint.parquet file and change the _delta_log/_last_checkpoint file that points to it; what would the negative impacts to the DeltaLake table be, if any?

Obviously time-traveling, i.e. loading a previous version of the table that relied on the parquet files I removed, would not work. What I want to know is, would there be any issues reading, writing, or appending to the current version of the DeltaLake table?

What I am thinking of doing in pySpark:

### Assuming a working SparkSession as `spark`

from subprocess import check_output
import json
from pyspark.sql import functions as F

awscmd = "aws s3 cp s3://my_s3_bucket/delta/_delta_log/_last_checkpoint -"
last_checkpoint = str(json.loads(check_output(awscmd, shell=True).decode("utf-8")).get('version')).zfill(20)

s3_bucket_path = "s3a://my_s3_bucket/delta/"

df_chkpt_del = (
    spark.read.format("parquet")
    .load(f"{s3_bucket_path}/_delta_log/{last_checkpoint}.checkpoint.parquet")
    .where(F.col("remove").isNotNull())
    .select("remove.*")
    .withColumn("deletionTimestamp", F.from_unixtime(F.col("deletionTimestamp")/1000))
    .withColumn("delDateDiffDays", F.datediff(F.col("deletionTimestamp"), F.current_timestamp()))
    .where(F.col("delDateDiffDays") < -7 )
)

There are a lot of options from here. One could be:

df_chkpt_del.select("path").toPandas().to_csv("files_to_delete.csv", index=False)

Where I could read files_to_delete.csv into a bash array and then use a simple bash for loop passing each parquet file s3 path to an aws s3 rm command to remove the files one by one.

This may be slower than vacuum(), but at least it will not be consuming cluster resources while it is working.

If I do this, will I also have to either:

  1. write a new _delta_log/000000000000000#####.json file that correctly documents these changes?
  2. write a new 000000000000000#####.checkpoint.parquet file that correctly documents these changes and change the _delta_log/_last_checkpoint file to point to that checkpoint.parquet file?

The second option would be easier.

However, if there will be no negative effects if I just remove the files and don't change anything in the _delta_log, then that would be the easiest.

Upvotes: 1

Views: 2053

Answers (1)

Samir Vyas
Samir Vyas

Reputation: 451

TLDR. Answering this question.

If I manually deleted the underlying parquet files and did not add a new json log file or add a new .checkpoint.parquet file and change the _delta_log/_last_checkpoint file that points to it; what would the negative impacts to the DeltaLake table be, if any?

Yes, this could potentially corrupt your delta table.

Let me briefly answers how delta-lake reads a version using _delta_log.

If you want to read version x then it will go to delta log of all versions from 1 to x-1 and will make a running sum of parquet files to read. Summary of this process is saved as a .checkpoint after every 10th version to make this process of running sum efficient.

What do I mean by this running sum?

Assume,
version 1 log says, add add file_1, file_2, file_3 version 2 log says, add delete file_1, file_2, and add file_4

So when reading version no 2, total instruction will be add file_1, file_2, file_3 -> delete file_1, file_2, and add file_4

So, resultant files read will be file_3 and file_4.

What if you delete a parquet from a file system?

Say in version 3, you delete file_4 from file system. If you don't use .vacuum then delta log will not know that file_4 is not present, it will try to read it and will fail.

Upvotes: 0

Related Questions