Reputation: 642
How can I check if a file exists given a certain base path. I am providing the method a file list for example: file1.snappy, file2,snappy,...
I need to check if file exists in either of the given paths for example: hdfs://a/b/c/source/file1.snappy
or if file exists in hdfs://a/b/c/target/file1.snappy
. How can I update/modify the method below to accept /a/b/c/target/
or /a/b/c/source/
as a basepath and check if file exists? If it exists in source add to a sourceList and if it is in destination add to a destination list.
val fs = FileSystem.get(sprk.sparkContext.hadoopConfiguration)
def fileExists(fileList:Array[String]) : Boolean = {
var fileNotFound = 0
fileList.foreach{
file => {
if(!fs.exists(new Path(file))) fileNotFound+=1
print("fileList",file)
}
}
if(fileNotFound > 0) {
println(fileNotFound + ": number of files not found probably moved")
false
}
else
true
}
Upvotes: 2
Views: 4633
Reputation: 29165
I have a source dir and target are like this below example
try this way for recursive lookup
URI.create(... )
is very imp when you are dealing with s3 objects (will also works with hdfs / local fs)
import java.net.URI
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, RemoteIterator}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
/**
* getAllFiles - get all files recursively from the sub folders.
*
* @param path String
* @param sc SparkContext
* @return Seq[String]
*/
def getAllFiles(path: String, sc: SparkContext): Seq[String] = {
val conf = sc.hadoopConfiguration
val fs = FileSystem.get(URI.create(path), conf)
val files: RemoteIterator[LocatedFileStatus] = fs.listFiles(new Path(path), true) // true for recursive lookup
val buf = new ArrayBuffer[String]
while (files.hasNext()) {
val fileStatus = files.next();
buf.append(fileStatus.getPath().toString)
}
buf.toSeq
}
Example usage :
val spark: SparkSession = SparkSession.builder.appName(getClass.getName)
.master("local[*]").getOrCreate
val sc = spark.sparkContext
val myfiles: Seq[String] = getAllFiles("data/test_table", sc)
myfiles.foreach(println)
println(myfiles.contains("/data/test_table/source/part-00000-19b67f0c-3fb0-4718-8a31-ac770e2dc0ba-c000.snappy.parquet"))
Result :
/data/test_table/target/part-00000-9205704a-cb0c-4933-87d4-c21313e76297-c000.snappy.parquet
/data/test_table/target/part-00000-19b67f0c-3fb0-4718-8a31-ac770e2dc0ba-c000.snappy1111.parquet
/data/test_table/target/part-00000-9205704a-cb0c-4933-87d4-c21313e76297-c000.snappy11.parquet
/data/test_table/source/part-00000-19b67f0c-3fb0-4718-8a31-ac770e2dc0ba-c000.snappy1.parquet
/data/test_table/source/part-00000-19b67f0c-3fb0-4718-8a31-ac770e2dc0ba-c000.snappy111.parquet
/data/test_table/source/part-00000-19b67f0c-3fb0-4718-8a31-ac770e2dc0ba-c000.snappy.parquet
true
Upvotes: 1
Reputation: 10372
Updated code to work for hdfs
& s3
.
Please check below code.
scala> :paste
// Entering paste mode (ctrl-D to finish)
import org.apache.hadoop.fs.{FileSystem, Path, RemoteIterator}
import org.apache.hadoop.fs._
// For converting to scala Iterator
implicit def convertToScalaIterator[T](remoteIterator: RemoteIterator[T]): Iterator[T] = {
case class wrapper(remoteIterator: RemoteIterator[T]) extends Iterator[T] {
override def hasNext: Boolean = remoteIterator.hasNext
override def next(): T = remoteIterator.next()
}
wrapper(remoteIterator)
}
import java.net.URI
def fs(path: String) = FileSystem.get(URI.create(path),spark.sparkContext.hadoopConfiguration)
// Exiting paste mode, now interpreting.
import org.apache.hadoop.fs.{FileSystem, Path, RemoteIterator}
import org.apache.hadoop.fs._
convertToScalaIterator: [T](remoteIterator: org.apache.hadoop.fs.RemoteIterator[T])Iterator[T]
fs: org.apache.hadoop.fs.FileSystem = org.apache.hadoop.fs.LocalFileSystem@640517de
Sample Directories
scala> "tree /tmp/examples".!
/tmp/examples
0 directories, 0 files
scala> "tree /tmp/sample".!
/tmp/sample
├── aaa
│ └── sample.json
├── bbb
│ └── bbb.json
├── ccc
└── ddd
4 directories, 2 files
Result
scala> List("/tmp/sample","/tmp/examples")
.flatMap(dir => {
fs(dir)
.listFiles(new Path(dir),true)
.toList
.filter(_.isFile)
.map(d => (d.getPath.getParent,d.getPath))
// If you want only Boolean values, May be change above line to ```.map(d => (d.getPath.getParent,d.isFile))```
})
.foreach(println)
(/tmp/sample/bbb,file:/tmp/sample/bbb/bbb.json)
(/tmp/sample/aaa,file:/tmp/sample/aaa/sample.json)
Upvotes: 3