Reputation: 77
I am facing problem when moving files between two HDFS folders in a spark application. We are using Spark 2.1 version and Scala as programming language. I imported org.apache.hadoop.fs package and 'rename' method as a work around for moving files as I couldn't find method to 'move files between hdfs folders' in that package. Code is as below.
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
def move_files(fileName, fromLocation:String, toLocation:String, spark: SparkSession): Unit = {
val conf = spark.sparkContext.hadoopConfiguration
val fs = FileSystem.get(conf)
val file_source = new Path(fromLocation + "/" + fileName)
println(file_source)
val file_target = new Path(toLocation + fileName)
println(file_target)
try {
fs.rename(file_source, file_target)
} catch {
case e: Exception => println(e); println("Exception moving files between folders")
}
}
the move files method is called in another method which has other application logic and I need to remove required files from source directory before proceeding with the logic.
def main () {
/*
logic
*/
move files (abc.xml, /location/dev/file_folder_source, /location/dev/file_folder_target, spark)
/*
logic
*/
}
That move_files step is getting executed, without any errors but file is not moved out from source folder to target folder. Program Execution is moving on with the logic which is erroring out due to presence of bad files in the source folder. Please suggest any other way to move files between folders in hdfs or point out where I am doing mistake in the above code.
Upvotes: 1
Views: 7798
Reputation: 1584
The api fs.rename(file_source, file_target)
return boolean
, if true
means moved the file successfully. false
means the file was not moved.
The move_files
is getting executed successfully, because the api used doesn't fail in case it not able to move the files. It simply return false
and continue execution.
You need to explicitly check the condition in your code.
For using the fs.rename
api, you need to create the target directory and then give only the target directory path. Like below:
val file_target = new Path("toLocation")
fs.mkdirs(file_target)
fs.rename(file_source, file_target)
See this lineval file_target = new Path("toLocation")
it contains only the directory path not the file name.
Upvotes: 1