aweis
aweis

Reputation: 5596

Spark dataframe checkpoint cleanup

I have a dataframe in spark where an entire partition from Hive has been loaded and i need to break the lineage to overwrite the same partition after some modifications to the data. However, when the spark job is done i am left with the data from the checkpoint on the HDFS. Why do Spark not clean this up by itself or is there something i am missing?

spark.sparkContext.setCheckpointDir("/home/user/checkpoint/")
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

val df = spark.table("db.my_table").filter(col("partition").equal(2))

// ... transformations to the dataframe

val checkpointDf = df.checkpoint()
checkpointDf.write.format("parquet").mode(SaveMode.Overwrite).insertInto("db.my_table")

After this i have this file on HDFS:

/home/user/checkpoint/214797f2-ce2e-4962-973d-8f215e5d5dd8/rdd-23/part-00000

And for each time i run the spark job i just get a new directory with a new unique id containing files for each RDD that has been in the dataframes.

Upvotes: 8

Views: 17711

Answers (1)

ggeop
ggeop

Reputation: 1375

Spark has implicit mechanism for checkpoint files cleaning.

Add this property in spark-defaults.conf.

spark.cleaner.referenceTracking.cleanCheckpoints  true #Default is false

You can find more about Spark configuration in Spark official configuration page

If you want to remove the checkpoint directory from HDFS you can remove it with Python, in the end of your script you could use this command rmtree.

This property spark.cleaner.referenceTracking.cleanCheckpoints as true, allows to cleaner to remove old checkpoint files inside the checkpoint directory.

Upvotes: 14

Related Questions