Reputation: 5596
I have a dataframe in spark where an entire partition from Hive has been loaded and i need to break the lineage to overwrite the same partition after some modifications to the data. However, when the spark job is done i am left with the data from the checkpoint on the HDFS. Why do Spark not clean this up by itself or is there something i am missing?
spark.sparkContext.setCheckpointDir("/home/user/checkpoint/")
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
val df = spark.table("db.my_table").filter(col("partition").equal(2))
// ... transformations to the dataframe
val checkpointDf = df.checkpoint()
checkpointDf.write.format("parquet").mode(SaveMode.Overwrite).insertInto("db.my_table")
After this i have this file on HDFS:
/home/user/checkpoint/214797f2-ce2e-4962-973d-8f215e5d5dd8/rdd-23/part-00000
And for each time i run the spark job i just get a new directory with a new unique id containing files for each RDD that has been in the dataframes.
Upvotes: 8
Views: 17711
Reputation: 1375
Spark has implicit mechanism for checkpoint files cleaning.
Add this property in spark-defaults.conf.
spark.cleaner.referenceTracking.cleanCheckpoints true #Default is false
You can find more about Spark configuration in Spark official configuration page
If you want to remove the checkpoint directory from HDFS you can remove it with Python, in the end of your script you could use this command rmtree.
This property spark.cleaner.referenceTracking.cleanCheckpoints
as true
, allows to cleaner to remove old checkpoint files inside the checkpoint directory.
Upvotes: 14