Yurii Oleynikov
Yurii Oleynikov

Reputation: 103

How to delete old data that was created by Spark Structured Streaming?

how can I delete old data created by Spark Structured Streaming (Spark 2.4.5)?

I have data on HDFS in parquet/avro format (not Delta), that is created by Spark Structured Streaming and partitioned by time (year,month,day of month,hour).

The data is created as following:

query = df.writeStream.format("avro").partitionBy("year", "month", "day", "hour").outputMode("append").option("checkpointLocation", "/data/avro.cp").start("/data/avro")

As result i have following partition folder layout:

./year=2020/month=3/day=13/hour=12
./year=2020/month=3/day=13/hour=13
./year=2020/month=3/day=13/hour=14
./year=2020/month=3/day=13/hour=15
./year=2020/month=3/day=13/hour=16

How can I delete old data, for example older than year=2020,month=2,day=13,hour=14?

Just deleting relevant folders

./year=2020/month=3/day=13/hour=12
./year=2020/month=3/day=13/hour=13

throws an exception while reading batch dataframe from file-system:

df = spark.read.format("avro").load("/data/avro")
java.io.FileNotFoundException: File file:/data/avro/year=2020/month=3/day=13/hour=12/part-00000-0cc84e65-3f49-4686-85e3-1ecf48952794.c000.avro does not exist

As I've figured out that is somehow related to _spark_metadata folder that is used by checkpoints.

Appreciate for your help.

Upvotes: 1

Views: 4943

Answers (4)

Artem Shutak
Artem Shutak

Reputation: 36

For Spark 3.0.0 and above, it has been implemented.

Basically it adds 3 strategy for committed files (ARCHIVE, DELETE, OFF) and allows just configure it.

Honestly, I have never tried it myself, but I see some answers here for Spark 3+ and it definitely worth mentioning.

Upvotes: 0

Yurii Oleynikov
Yurii Oleynikov

Reputation: 103

It seems that I found solution/workaround. The key concept is to use FileStreamSinkLog andn udpate it with SinkFileStatus with action set to delete:

  1. load FileStreamSinkLog

     sinkLog = new FileStreamSinkLog(1, spark, full-path-to-spark-metadata-dir);
    
  2. get latest SinkFileStatus

     Option<Tuple2<Object, SinkFileStatus[]>> latest = sinkLog.getLatest();
     long batchId = (long)latest.get()._1;
     SinkFileStatus[] fileStatuses = latest.get()._2;
    
  3. delete old files

  4. Add new new entry with delete action to fileStatuses array

  5. write batchId log file back with updated fileStatuses

However this requires that structured streaming job be stopped. So there's no solution for delete old files written by Spark Structured Streaming without stopping it.

Upvotes: 4

Mark Hanson
Mark Hanson

Reputation: 21

For your ease of copy/paste, here's a working code (scala) snippet as of spark 3.0.1. Deletes one file and writes a new batch:

import org.apache.spark.sql.execution.streaming.FileStreamSinkLog

import scala.language.postfixOps
import scala.sys.process._
import scala.util.Try

        val sinkLog = new FileStreamSinkLog (
            1,
            spark,
            SPARK_METADATA_ROOT
        )
        val head = sinkLog.allFiles().head

        val deleteCommand = s"hadoop fs -rm ${head.path}"
        println (Try (deleteCommand ! processlogger).getOrElse(s""""$deleteCommand" failed""") )

        head.copy(action = FileStreamSinkLog.DELETE_ACTION)

        sinkLog
            .add (
                latestBatch.get._1+1,
                Array(head.copy(action = FileStreamSinkLog.DELETE_ACTION))
                )

Upvotes: 0

Zack
Zack

Reputation: 2466

You can't delete that folder unless you delete it's corresponding checkpoint folders too. You are trying to delete the folder while the checkpoint still has knowledge of it, so that is why the error is occurring.

However, I really wouldn't recommend messing with the checkpoint folder unless necessary. If it's possible in your situation, I'd suggest instead moving your old data to different data storage types such as in AWS Standard -> Glacier.

Upvotes: 1

Related Questions