Theo
Theo

Reputation: 631

How to do proper housekeeping of partitioned parquet files generated from Spark Streaming

My Spark structured streaming job continuously generates parquet files which I want to delete after expiration (Let's say after 30 days).

I store my parquet data partitioned with the partition key being the event date in RFC3339/ISO8601 so that housekeeping could be done fairly easy on HDFS level based on a cron job (Delete all parquet-folders with partitionkey < oldestAllowedAge in terms of string comparison).

However, since I introduced Spark Streaming, Spark writes metadata to a folder named _spark_metadata next to the to be written data itself. If I now just delete the expired HDFS files and run a spark batch-job on the entire dataset, the job will fail due to files not found. The batchjob will read the metadata and expect already deleted files to exist.

The easy solution to this is to just disable the creation of _spark_metadata directory, as described here: disabling _spark_metadata in Structured streaming in spark 2.3.0 . But as I don't want to lose performance in reading the data for my regular batch analysis, I wonder if there isn't a better solution.

I thought, I could then just use spark for deletion so that it deletes the parquet hdfs files AND updates metadata. However, simply performing a

session.sql(String.format("DELETE FROM parquet.`%s` WHERE partitionKey < " + oldestAllowedPartitionAge, path.toString()));

doesn't work. DELETE sadly is an unsupported operation in Spark...

Is there any solution so that I can delete the old data but still have the _spark_metadata folder working?

Upvotes: 4

Views: 3090

Answers (3)

Shubham Saxena
Shubham Saxena

Reputation: 65

As far as I understand this one has three options to deal with this:

1) Use spark.load(filePathsUsingGlobRegex) to load only the files that need to be read, that way spark doesn't need to load all the files and hence doesn't need spark_metadata.

Pros: You still get the benefit of spark_metadata (reads are faster, exactly-once semantics is still ensured)

Cons: You have to construct the path to files yourself, which can messier if you have data stored in a variety of partitioning strategies.

2) Don't create spark_metadata in the output directory disabling _spark_metadata in Structured streaming in spark 2.3.0

Pros: Cleaning up is simple

Cons: You lose the benefit of spark_metadata.

3) Understand and update the spark_metadata file while you upgrade delete the older files.

Pros: You have both retention working and spark_metadata benefits together.

Cons: You have to manually change the _spark_metadata which can be a tough/messy code to maintain. Given that's spark internal and can change.

Upvotes: 0

Jungtaek Lim
Jungtaek Lim

Reputation: 1708

This is actually one of known issues in Structured Streaming (SPARK-24295) though it only occurs with massive input files, and end users are taking their own workarounds. For example, stop the query -> remove old input files -> manipulate metadata manually to purge them -> restart the query.

Given manually manipulating metadata is not trivial and not ideal (given it should stop the streaming query, and force end users understand the format of metadata), SPARK-27188 is proposed as an alternative - it applies retention and purges outdated input files from metadata.

Upvotes: 1

Bartosz Konieczny
Bartosz Konieczny

Reputation: 2033

As far as I understand, the main purpose of _spark_metadata was to ensure fault-tolerancy and avoid listing all files to process:

In order to correctly handle partial failures while maintaining exactly once semantics, the files for each batch are written out to a unique directory and then atomically appended to a metadata log. When a parquet based DataSource is initialized for reading, we first check for this log directory and use it instead of file listing when present.

https://github.com/apache/spark/commit/6bc4be64f86afcb38e4444c80c9400b7b6b745de

The link you quoted (disabling _spark_metadata in Structured streaming in spark 2.3.0) explains that the problem came from inconsistent checkpoint state - the checkpoint generated metadata but later the user removed it manually and when he restarted the query, it failed because checkpoint expected to have metadata file.

To see whether the lack of metadata will fail your batch processing, please take a look at org.apache.spark.sql.execution.datasources.DataSource#resolveRelation method where you can find pattern matching with 2 cases:

  // We are reading from the results of a streaming query. Load files from the metadata log
  // instead of listing them using HDFS APIs.
  case (format: FileFormat, _)
      if FileStreamSink.hasMetadata(
        caseInsensitiveOptions.get("path").toSeq ++ paths,
        sparkSession.sessionState.newHadoopConf()) =>
  case (format: FileFormat, _) =>
    val globbedPaths =
      checkAndGlobPathIfNecessary(checkEmptyGlobPath = true, checkFilesExist = checkFilesExist)

And hasMetadata method looks like:

  def hasMetadata(path: Seq[String], hadoopConf: Configuration): Boolean = {
    path match {
      case Seq(singlePath) =>
        try {
          val hdfsPath = new Path(singlePath)
          val fs = hdfsPath.getFileSystem(hadoopConf)
          if (fs.isDirectory(hdfsPath)) {
            fs.exists(new Path(hdfsPath, metadataDir))
          } else {
            false
          }
        } catch {
          case NonFatal(e) =>
            logWarning(s"Error while looking for metadata directory.")
            false
        }
      case _ => false
    }
  }

As you can see, there is no risk of failure (at least by reading the code!). If you have some, please give more context because maybe the problem is elsewhere.

About your performance concern, this _spark_metadata contains only file listing, so of course, Spark will first need to list the files from your input directory. But from my experience it's not the most costly operation. For instance, listing the directory with 1297 files on AWS S3 takes approximately 9 seconds. After that, it's up to you to decide whether you would like to have a simple cleaning process or slightly slower batch processing. If you have much more files like that, maybe you also should group them into bigger ones, like 256 MB or more ?

Still, if you want to keep the _spark_metadata, maybe there is a way to remove files by your cleaning app. But it will be challenging since you will have 2 apps (streaming and cleaning) working on the same data.

You can find more information about _spark_metadata here: How to change the location of _spark_metadata directory?

Upvotes: 3

Related Questions