Reputation: 12112
I have a directory structured like so:
temp/Tweets/userId123/Tweets.csv
temp/Tweets/userId456/Tweets.csv
temp/Tweets/userId789/Tweets.csv
temp/Mentions/userId123/Mentions.csv
temp/Mentions/userId456/Mentions.csv
temp/Mentions/userId789/Mentions.csv
.
.
.
The data is structured by the type of data entity, I want to restructure it by the user, like so:
final/userId123/Tweets.csv
final/userId123/Mentions.csv
.
.
final/userId456/Tweets.csv
final/userId456/Mentions.csv
.
.
I've been looking around on google/StackOverflow/Spark docs, but haven't seen a way to do this, but I think there should be a way to modify the directory structure. How can I do this?
Upvotes: 1
Views: 1996
Reputation: 61666
You have the possibility to use the hadoop.fs.FileSystem
API from Scala (or Python or Java - here I'll use Scala):
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
First, let's define a way to list userids hdfs folders:
def listFolderNamesInFolder(hdfsPath: String): List[String] =
FileSystem
.get(new Configuration())
.listStatus(new Path(hdfsPath))
.flatMap(status => if (!status.isFile) Some(status.getPath.getName) else None)
.toList
Then let's define two helpers to move hdfs files and create hdfs folders:
def moveFile(oldPath: String, newPath: String): Unit = {
val fileSystem = FileSystem.get(new Configuration())
fileSystem.rename(new Path(oldPath), new Path(newPath))
}
def createFolder(hdfsPath: String): Unit =
FileSystem.get(new Configuration()).mkdirs(new Path(hdfsPath))
Finally, let's loop over each userid folder and move each Tweets, Mentions files to the associated final folder:
def moveTweetFiles(hdfsPath: String): Unit =
listFolderNamesInFolder(s"$hdfsPath/temp/Tweets").foreach {
case userid =>
createFolder(s"$hdfsPath/final/$userid")
moveFile(
s"$hdfsPath/temp/Tweets/$userid/Tweets.csv",
s"$hdfsPath/final/$userid/Tweets.csv")
}
def moveMentionsFiles(hdfsPath: String): Unit =
listFolderNamesInFolder(s"$hdfsPath/temp/Mentions").foreach {
case userid =>
createFolder(s"$hdfsPath/final/$userid")
moveFile(
s"$hdfsPath/temp/Mentions/$userid/Mentions.csv",
s"$hdfsPath/final/$userid/Mentions.csv")
}
If your hdfs root folder (the one which contains temp and final folders) is "src/test/resources" (the one I used to test):
moveTweetFiles("src/test/resources")
moveMentionsFiles("src/test/resources")
By the way: FileSystem is already embedded in Spark dependencies (no need to add additional dependencies).
This can be launched as a Spark job (spark-submit), even though we don't use any Spark pipeline; or probably just from the spark-shell.
Upvotes: 2