RSG
RSG

Reputation: 77

Move files between hdfs directories as aprt of spark scala application

I am facing problem when moving files between two HDFS folders in a spark application. We are using Spark 2.1 version and Scala as programming language. I imported org.apache.hadoop.fs package and 'rename' method as a work around for moving files as I couldn't find method to 'move files between hdfs folders' in that package. Code is as below.

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}          

def move_files(fileName, fromLocation:String, toLocation:String, spark: SparkSession): Unit = {
    val conf = spark.sparkContext.hadoopConfiguration
    val fs = FileSystem.get(conf)
    
    val file_source = new Path(fromLocation + "/" + fileName)
    println(file_source)   
    val file_target = new Path(toLocation + fileName)
    println(file_target)  
 
    try {
    fs.rename(file_source, file_target)
    } catch {
    case e: Exception => println(e); println("Exception moving files between folders")
    }
}

the move files method is called in another method which has other application logic and I need to remove required files from source directory before proceeding with the logic.

def main () {
    /*
    logic
    */
    move files (abc.xml, /location/dev/file_folder_source, /location/dev/file_folder_target, spark)
    /*
    logic
    */
}

That move_files step is getting executed, without any errors but file is not moved out from source folder to target folder. Program Execution is moving on with the logic which is erroring out due to presence of bad files in the source folder. Please suggest any other way to move files between folders in hdfs or point out where I am doing mistake in the above code.

Upvotes: 1

Views: 7798

Answers (1)

Amit Kumar
Amit Kumar

Reputation: 1584

The api fs.rename(file_source, file_target) return boolean, if true means moved the file successfully. false means the file was not moved.

The move_files is getting executed successfully, because the api used doesn't fail in case it not able to move the files. It simply return false and continue execution. You need to explicitly check the condition in your code.

For using the fs.rename api, you need to create the target directory and then give only the target directory path. Like below:

val file_target = new Path("toLocation")
fs.mkdirs(file_target)
fs.rename(file_source, file_target)

See this lineval file_target = new Path("toLocation") it contains only the directory path not the file name.

Upvotes: 1

Related Questions