tldr
tldr

Reputation: 12112

Spark - how to restructure directories in HDFS

I have a directory structured like so:

temp/Tweets/userId123/Tweets.csv
temp/Tweets/userId456/Tweets.csv
temp/Tweets/userId789/Tweets.csv

temp/Mentions/userId123/Mentions.csv
temp/Mentions/userId456/Mentions.csv
temp/Mentions/userId789/Mentions.csv

.
.
.

The data is structured by the type of data entity, I want to restructure it by the user, like so:

final/userId123/Tweets.csv
final/userId123/Mentions.csv
.
.

final/userId456/Tweets.csv
final/userId456/Mentions.csv
.
.

I've been looking around on google/StackOverflow/Spark docs, but haven't seen a way to do this, but I think there should be a way to modify the directory structure. How can I do this?

Upvotes: 1

Views: 1996

Answers (1)

Xavier Guihot
Xavier Guihot

Reputation: 61666

You have the possibility to use the hadoop.fs.FileSystem API from Scala (or Python or Java - here I'll use Scala):

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

First, let's define a way to list userids hdfs folders:

def listFolderNamesInFolder(hdfsPath: String): List[String] =
  FileSystem
    .get(new Configuration())
    .listStatus(new Path(hdfsPath))
    .flatMap(status => if (!status.isFile) Some(status.getPath.getName) else None)
    .toList

Then let's define two helpers to move hdfs files and create hdfs folders:

def moveFile(oldPath: String, newPath: String): Unit = {
  val fileSystem = FileSystem.get(new Configuration())
  fileSystem.rename(new Path(oldPath), new Path(newPath))
}

def createFolder(hdfsPath: String): Unit =
  FileSystem.get(new Configuration()).mkdirs(new Path(hdfsPath))

Finally, let's loop over each userid folder and move each Tweets, Mentions files to the associated final folder:

def moveTweetFiles(hdfsPath: String): Unit =
  listFolderNamesInFolder(s"$hdfsPath/temp/Tweets").foreach {
    case userid =>
      createFolder(s"$hdfsPath/final/$userid")
      moveFile(
        s"$hdfsPath/temp/Tweets/$userid/Tweets.csv",
        s"$hdfsPath/final/$userid/Tweets.csv")
  }

def moveMentionsFiles(hdfsPath: String): Unit =
  listFolderNamesInFolder(s"$hdfsPath/temp/Mentions").foreach {
    case userid =>
      createFolder(s"$hdfsPath/final/$userid")
      moveFile(
        s"$hdfsPath/temp/Mentions/$userid/Mentions.csv",
        s"$hdfsPath/final/$userid/Mentions.csv")
  }

If your hdfs root folder (the one which contains temp and final folders) is "src/test/resources" (the one I used to test):

moveTweetFiles("src/test/resources")
moveMentionsFiles("src/test/resources")

By the way: FileSystem is already embedded in Spark dependencies (no need to add additional dependencies).

This can be launched as a Spark job (spark-submit), even though we don't use any Spark pipeline; or probably just from the spark-shell.

Upvotes: 2

Related Questions