coderWorld
coderWorld

Reputation: 642

Check if file exists in HDFS path?

How can I check if a file exists given a certain base path. I am providing the method a file list for example: file1.snappy, file2,snappy,...

I need to check if file exists in either of the given paths for example: hdfs://a/b/c/source/file1.snappy or if file exists in hdfs://a/b/c/target/file1.snappy. How can I update/modify the method below to accept /a/b/c/target/ or /a/b/c/source/ as a basepath and check if file exists? If it exists in source add to a sourceList and if it is in destination add to a destination list.

  val fs = FileSystem.get(sprk.sparkContext.hadoopConfiguration)

  def fileExists(fileList:Array[String]) : Boolean = {
    var fileNotFound = 0
    fileList.foreach{
      file => {
        if(!fs.exists(new Path(file)))  fileNotFound+=1
        print("fileList",file)
      }
    }
    if(fileNotFound > 0) {
      println(fileNotFound + ": number of files not found probably moved")
      false
    }
    else
      true
  }

Upvotes: 2

Views: 4633

Answers (2)

Ram Ghadiyaram
Ram Ghadiyaram

Reputation: 29165

I have a source dir and target are like this below example enter image description here

try this way for recursive lookup

URI.create(... ) is very imp when you are dealing with s3 objects (will also works with hdfs / local fs)

import java.net.URI

import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, RemoteIterator}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
/**
    * getAllFiles - get all files recursively from the sub folders.
    * 
    * @param path String
    * @param sc SparkContext
    * @return Seq[String]
    */
  def getAllFiles(path: String, sc: SparkContext): Seq[String] = {
    val conf = sc.hadoopConfiguration
    val fs = FileSystem.get(URI.create(path), conf)
    val files: RemoteIterator[LocatedFileStatus] = fs.listFiles(new Path(path), true) // true for recursive lookup
    val buf = new ArrayBuffer[String]
    while (files.hasNext()) {
      val fileStatus = files.next();
      buf.append(fileStatus.getPath().toString)
    }
    buf.toSeq
  }

Example usage :

 val spark: SparkSession = SparkSession.builder.appName(getClass.getName)
    .master("local[*]").getOrCreate


  val sc = spark.sparkContext

  val myfiles: Seq[String] = getAllFiles("data/test_table", sc)
  myfiles.foreach(println)
  println(myfiles.contains("/data/test_table/source/part-00000-19b67f0c-3fb0-4718-8a31-ac770e2dc0ba-c000.snappy.parquet"))

Result :

/data/test_table/target/part-00000-9205704a-cb0c-4933-87d4-c21313e76297-c000.snappy.parquet
/data/test_table/target/part-00000-19b67f0c-3fb0-4718-8a31-ac770e2dc0ba-c000.snappy1111.parquet
/data/test_table/target/part-00000-9205704a-cb0c-4933-87d4-c21313e76297-c000.snappy11.parquet
/data/test_table/source/part-00000-19b67f0c-3fb0-4718-8a31-ac770e2dc0ba-c000.snappy1.parquet
/data/test_table/source/part-00000-19b67f0c-3fb0-4718-8a31-ac770e2dc0ba-c000.snappy111.parquet
/data/test_table/source/part-00000-19b67f0c-3fb0-4718-8a31-ac770e2dc0ba-c000.snappy.parquet


true

Upvotes: 1

s.polam
s.polam

Reputation: 10372

Updated code to work for hdfs & s3.

Please check below code.

scala> :paste
// Entering paste mode (ctrl-D to finish)

import org.apache.hadoop.fs.{FileSystem, Path, RemoteIterator}
import org.apache.hadoop.fs._

// For converting to scala Iterator
implicit def convertToScalaIterator[T](remoteIterator: RemoteIterator[T]): Iterator[T] = {
    case class wrapper(remoteIterator: RemoteIterator[T]) extends Iterator[T] {
      override def hasNext: Boolean = remoteIterator.hasNext
      override def next(): T = remoteIterator.next()
    }
    wrapper(remoteIterator)
}

import java.net.URI

def fs(path: String) = FileSystem.get(URI.create(path),spark.sparkContext.hadoopConfiguration)

// Exiting paste mode, now interpreting.

import org.apache.hadoop.fs.{FileSystem, Path, RemoteIterator}
import org.apache.hadoop.fs._
convertToScalaIterator: [T](remoteIterator: org.apache.hadoop.fs.RemoteIterator[T])Iterator[T]
fs: org.apache.hadoop.fs.FileSystem = org.apache.hadoop.fs.LocalFileSystem@640517de

Sample Directories

scala> "tree /tmp/examples".!
/tmp/examples

0 directories, 0 files

scala> "tree /tmp/sample".!
/tmp/sample
├── aaa
│   └── sample.json
├── bbb
│   └── bbb.json
├── ccc
└── ddd

4 directories, 2 files

Result

scala> List("/tmp/sample","/tmp/examples")
.flatMap(dir => {
    fs(dir)
    .listFiles(new Path(dir),true)
    .toList
    .filter(_.isFile)
    .map(d => (d.getPath.getParent,d.getPath))
// If you want only Boolean values, May be change above line to ```.map(d => (d.getPath.getParent,d.isFile))``` 
})
.foreach(println)

(/tmp/sample/bbb,file:/tmp/sample/bbb/bbb.json)
(/tmp/sample/aaa,file:/tmp/sample/aaa/sample.json)


Upvotes: 3

Related Questions