Reputation: 103
how can I delete old data created by Spark Structured Streaming (Spark 2.4.5)?
I have data on HDFS in parquet/avro format (not Delta), that is created by Spark Structured Streaming and partitioned by time (year,month,day of month,hour).
The data is created as following:
query = df.writeStream.format("avro").partitionBy("year", "month", "day", "hour").outputMode("append").option("checkpointLocation", "/data/avro.cp").start("/data/avro")
As result i have following partition folder layout:
./year=2020/month=3/day=13/hour=12
./year=2020/month=3/day=13/hour=13
./year=2020/month=3/day=13/hour=14
./year=2020/month=3/day=13/hour=15
./year=2020/month=3/day=13/hour=16
How can I delete old data, for example older than year=2020,month=2,day=13,hour=14?
Just deleting relevant folders
./year=2020/month=3/day=13/hour=12
./year=2020/month=3/day=13/hour=13
throws an exception while reading batch dataframe from file-system:
df = spark.read.format("avro").load("/data/avro")
java.io.FileNotFoundException: File file:/data/avro/year=2020/month=3/day=13/hour=12/part-00000-0cc84e65-3f49-4686-85e3-1ecf48952794.c000.avro does not exist
As I've figured out that is somehow related to _spark_metadata
folder that is used by checkpoints.
Appreciate for your help.
Upvotes: 1
Views: 4943
Reputation: 36
For Spark 3.0.0 and above, it has been implemented.
Basically it adds 3 strategy for committed files (ARCHIVE, DELETE, OFF) and allows just configure it.
Honestly, I have never tried it myself, but I see some answers here for Spark 3+ and it definitely worth mentioning.
Upvotes: 0
Reputation: 103
It seems that I found solution/workaround.
The key concept is to use FileStreamSinkLog andn udpate it with SinkFileStatus with action set to delete
:
load FileStreamSinkLog
sinkLog = new FileStreamSinkLog(1, spark, full-path-to-spark-metadata-dir);
get latest SinkFileStatus
Option<Tuple2<Object, SinkFileStatus[]>> latest = sinkLog.getLatest();
long batchId = (long)latest.get()._1;
SinkFileStatus[] fileStatuses = latest.get()._2;
delete old files
Add new new entry with delete
action to fileStatuses
array
write batchId
log file back with updated fileStatuses
However this requires that structured streaming job be stopped. So there's no solution for delete old files written by Spark Structured Streaming without stopping it.
Upvotes: 4
Reputation: 21
For your ease of copy/paste, here's a working code (scala) snippet as of spark 3.0.1. Deletes one file and writes a new batch:
import org.apache.spark.sql.execution.streaming.FileStreamSinkLog
import scala.language.postfixOps
import scala.sys.process._
import scala.util.Try
val sinkLog = new FileStreamSinkLog (
1,
spark,
SPARK_METADATA_ROOT
)
val head = sinkLog.allFiles().head
val deleteCommand = s"hadoop fs -rm ${head.path}"
println (Try (deleteCommand ! processlogger).getOrElse(s""""$deleteCommand" failed""") )
head.copy(action = FileStreamSinkLog.DELETE_ACTION)
sinkLog
.add (
latestBatch.get._1+1,
Array(head.copy(action = FileStreamSinkLog.DELETE_ACTION))
)
Upvotes: 0
Reputation: 2466
You can't delete that folder unless you delete it's corresponding checkpoint folders too. You are trying to delete the folder while the checkpoint still has knowledge of it, so that is why the error is occurring.
However, I really wouldn't recommend messing with the checkpoint folder unless necessary. If it's possible in your situation, I'd suggest instead moving your old data to different data storage types such as in AWS Standard -> Glacier.
Upvotes: 1