Hadooper1988
Hadooper1988

Reputation: 63

In Spark Streaming how to process old data and delete processed Data

We are running a Spark streaming job that retrieves files from a directory (using textFileStream). One concern we are having is the case where the job is down but files are still being added to the directory. Once the job starts up again, those files are not being picked up (since they are not new or changed while the job is running) but we would like them to be processed.

1) Is there a solution for that? Is there a way to keep track what files have been processed and can we "force" older files to be picked up?

2) Is there a way to delete the processed files?

Upvotes: 3

Views: 4634

Answers (3)

Mr AK
Mr AK

Reputation: 63

The answer to your second question,

It is now possible in Spark 3. You can use "cleanSource" option for readStream.

Thanks to documentation https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html and this video https://www.youtube.com/watch?v=EM7T34Uu2Gg.

After searching for many hours, finally got the solution

Upvotes: 1

Victor
Victor

Reputation: 2546

Is there a way to delete the processed files?

In my experience, I can´t get to work the checkpointing feature so I had to delete/move the processed files that have entered each batch.

The way for getting those files is a bit tricky, but basically we can say that they are ancestors (dependencies) of the current RDD. What I use then, is a recursive method that crawls that structure and recovers the names of the RDDs that begin with hdfs.

  /**
    * Recursive method to extract original metadata files involved in this batch.
    * @param rdd Each RDD created for each batch.
    * @return All HDFS files originally read.
    */
   def extractSourceHDFSFiles(rdd: RDD[_]): Set[String] = {

     def extractSourceHDFSFilesWithAcc(rdd: List[RDD[_]]) : Set[String] = {

      rdd match {
        case Nil => Set()
        case head :: tail => {
          val name = head.toString()
          if (name.startsWith("hdfs")){
            Set(name.split(" ")(0)) ++ extractSourceHDFSFilesWithAcc(head.dependencies.map(_.rdd).toList) ++ extractSourceHDFSFilesWithAcc(tail)
          }
          else {
            extractSourceHDFSFilesWithAcc(head.dependencies.map(_.rdd).toList) ++ extractSourceHDFSFilesWithAcc(tail)
          }
        }
      }
    }

    extractSourceHDFSFilesWithAcc(rdd.dependencies.map(_.rdd).toList)
  }

So, in the forEachRDD method you can easily invoke it:

stream.forEachRDD(rdd -> {

     val filesInBatch = extractSourceHDFSFiles(rdd)
    logger.info("Files to be processed:")

    // Process them

    // Delete them when you are done
})

Upvotes: 0

nkasturi
nkasturi

Reputation: 181

The article below pretty much covers all your questions.

https://blog.yanchen.ca/2016/06/28/fileinputdstream-in-spark-streaming/

1) Is there a solution for that? Is there a way to keep track what files have been processed and can we "force" older files to be picked up?

Stream reader initiates batch window using the system clock when a job/application is launched. Apparently all the files created before would be ignored. Try enabling checkpointing.

2) Is there a way to delete the processed files?

Deleting files might be unnecessary. If checkpointing works, the files not being processed are identified by Spark. If for some reason the files are to be deleted, implement a custom input format and reader (please refer article) to capture the file name and use this information as appropriate. But I wouldn't recommend this approach.

Upvotes: 2

Related Questions