Reputation: 810
So I basically have the following code to read a .csv file and store it in an Array[Array[String]]
:
def load(filepath: String): Array[Array[String]] = {
var data = Array[Array[String]]()
val bufferedSource = io.Source.fromFile(filepath)
for (line <- bufferedSource.getLines) {
data :+ line.split(",").map(_.trim)
}
bufferedSource.close
return data.slice(1,data.length-1) //skip header
}
Which works for files that are not stored on HDFS. However, when I try the same thing on HDFS I get
No such file or directory found
When writing to a file on HDFS I also had to change my original code and added some FileSystem
and Path
arguments to PrintWriter
, but this time I have no idea at all how to do it.
I am this far:
def load(filepath: String, sc: SparkContext): Array[Array[String]] = {
var data = Array[Array[String]]()
val fs = FileSystem.get(sc.hadoopConfiguration)
val stream = fs.open(new Path(filepath))
var line = ""
while ((line = stream.readLine()) != null) {
data :+ line.split(",").map(_.trim)
}
return data.slice(1,data.length-1) //skip header
}
This should work, but I get a NullPointerException
when comparing line to null or if its length is over 0.
Upvotes: 0
Views: 2288
Reputation: 810
This code will read a .csv file from HDFS:
def read(filepath: String, sc: SparkContext): ArrayBuffer[Array[String]] = {
var data = ArrayBuffer[Array[String]]()
val fs = FileSystem.get(sc.hadoopConfiguration)
val stream = fs.open(new Path(filepath))
var line = stream.readLine()
while (line != null) {
val row = line.split(",").map(_.trim)
data += row
line = stream.readLine()
}
stream.close()
return data // or return data.slice(1,data.length-1) to skip header
}
Upvotes: 1
Reputation: 448
Please read this post about reading CSV by Alvin Alexander, writer of the Scala Cookbook:
object CSVDemo extends App {
println("Month, Income, Expenses, Profit")
val bufferedSource = io.Source.fromFile("/tmp/finance.csv")
for (line <- bufferedSource.getLines) {
val cols = line.split(",").map(_.trim)
// do whatever you want with the columns here
println(s"${cols(0)}|${cols(1)}|${cols(2)}|${cols(3)}")
}
bufferedSource.close
}
You just have to get an InputStream from your HDFS and replace in this snippet
Upvotes: -1