Atharv Thakur
Atharv Thakur

Reputation: 701

Rename and Move S3 files based on their folders name in spark scala

I have spark output in a s3 folders and I want to move all s3 files from that output folder to another location ,but while moving I want to rename the files .

For example I have files in S3 folders like below

enter image description here

Now I want to rename all files and put into another directory,but the name of the files would be like below


Here Fundamental.FinancialStatementis constant in all the files 2017-10-18-0439 current date time .

This is what I have tried so far but not able to get folder name and loop through all files

    import org.apache.hadoop.fs._

val src = new Path("s3://trfsmallfffile/Segments/output")
val dest = new Path("s3://trfsmallfffile/Segments/Finaloutput")
val conf = sc.hadoopConfiguration   // assuming sc = spark context
val fs = src.getFileSystem(conf)
//val file = fs.globStatus(new Path("src/DataPartition=Japan/part*.gz"))(0).getPath.getName
val status = fs.listStatus(src)    

status.foreach(filename => {
               val a = filename.getPath.getName.toString()
                println("file name"+a)

This gives me below output

    file nameDataPartition=Japan
file nameDataPartition=SelfSourcedPrivate
file nameDataPartition=SelfSourcedPublic
file name_SUCCESS

This gives me folders details not files inside the folder.

Reference is taken from here Stack Overflow Refrence

Upvotes: 2

Views: 8064

Answers (2)

Sudarshan kumar
Sudarshan kumar

Reputation: 1585

You are getting directory because you have sub dir level in s3 .

/*/* to go in subdir .

Try this

import org.apache.hadoop.fs._

val src = new Path("s3://trfsmallfffile/Segments/Output/*/*")
val dest = new Path("s3://trfsmallfffile/Segments/FinalOutput")
val conf = sc.hadoopConfiguration   // assuming sc = spark context
val fs = src.getFileSystem(conf)

val file = fs.globStatus(new Path("s3://trfsmallfffile/Segments/Output/*/*"))

  for (urlStatus <- file) {
    //println("S3 FILE PATH IS ===:" + urlStatus.getPath)
    val partitioName=urlStatus.getPath.toString.split("=")(1).split("\\/")(0).toString
    val finalPrefix="Fundamental.FinancialLineItem.Segments."
    val finalFileName=finalPrefix+partitioName+".txt"
    val dest = new Path("s3://trfsmallfffile/Segments/FinalOutput"+"/"+finalFileName+ " ")
    fs.rename(urlStatus.getPath, dest)

Upvotes: 5

Avishek Bhattacharya
Avishek Bhattacharya

Reputation: 6994

This has worked for me in past

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration 
val path = "s3://<bucket>/<directory>"
val fs = FileSystem.get(new, spark.sparkContext.hadoopConfiguration)
fs.listStatus(new Path(path))

The list status provides all the files in the s3 directory

Upvotes: 0

Related Questions