osk
osk

Reputation: 810

Load a .csv file from HDFS in Scala

So I basically have the following code to read a .csv file and store it in an Array[Array[String]]:

def load(filepath: String): Array[Array[String]] = {
      var data = Array[Array[String]]()
      val bufferedSource = io.Source.fromFile(filepath)
      for (line <- bufferedSource.getLines) {
        data :+ line.split(",").map(_.trim)
      }
      bufferedSource.close
      return data.slice(1,data.length-1) //skip header
  }

Which works for files that are not stored on HDFS. However, when I try the same thing on HDFS I get

No such file or directory found

When writing to a file on HDFS I also had to change my original code and added some FileSystem and Path arguments to PrintWriter, but this time I have no idea at all how to do it.

I am this far:

  def load(filepath: String, sc: SparkContext): Array[Array[String]] = {
      var data = Array[Array[String]]()
      val fs = FileSystem.get(sc.hadoopConfiguration)
      val stream = fs.open(new Path(filepath))
      var line = ""
      while ((line = stream.readLine()) != null) {
        data :+ line.split(",").map(_.trim)
      }

      return data.slice(1,data.length-1) //skip header
  }

This should work, but I get a NullPointerException when comparing line to null or if its length is over 0.

Upvotes: 0

Views: 2288

Answers (2)

osk
osk

Reputation: 810

This code will read a .csv file from HDFS:

  def read(filepath: String, sc: SparkContext): ArrayBuffer[Array[String]] = {
      var data = ArrayBuffer[Array[String]]()
      val fs = FileSystem.get(sc.hadoopConfiguration)
      val stream = fs.open(new Path(filepath))
      var line = stream.readLine()
      while (line != null) {
        val row = line.split(",").map(_.trim)
        data += row
        line = stream.readLine()
      }
      stream.close()

      return data // or return data.slice(1,data.length-1) to skip header
  }

Upvotes: 1

Nicolas Cailloux
Nicolas Cailloux

Reputation: 448

Please read this post about reading CSV by Alvin Alexander, writer of the Scala Cookbook:

object CSVDemo extends App {
  println("Month, Income, Expenses, Profit")
  val bufferedSource = io.Source.fromFile("/tmp/finance.csv")
  for (line <- bufferedSource.getLines) {
    val cols = line.split(",").map(_.trim)
    // do whatever you want with the columns here
    println(s"${cols(0)}|${cols(1)}|${cols(2)}|${cols(3)}")
  }
  bufferedSource.close
}

You just have to get an InputStream from your HDFS and replace in this snippet

Upvotes: -1

Related Questions