user3684014
user3684014

Reputation: 1175

Convert csv to RDD

I tried the accepted solution in How do I convert csv file to rdd, I want to print out all the users except "om":

val csv = sc.textFile("file.csv")  // original file
val data = csv.map(line => line.split(",").map(elem => elem.trim)) //lines in rows
val header = new SimpleCSVHeader(data.take(1)(0)) // we build our header with the first line
val rows = data.filter(line => header(line,"user") != "om") // filter the header out
val users = rows.map(row => header(row,"user")
users.collect().map(user => println(user))

but I got an error:

java.util.NoSuchElementException: key not found: user

I try to debug it and find the index attributes in header look like this: enter image description here

Since I'm new to spark and scala, does this mean that user is already in a Map? Then why the key not found error?

Upvotes: 2

Views: 1679

Answers (2)

cmd
cmd

Reputation: 11841

I've rewritten the sample code to remove the header method. IMO, this example provides a step by step walkthrough that is easier to follow. Here is a more detailed explanation.

def main(args: Array[String]): Unit = {
  val csv = sc.textFile("/path/to/your/file.csv")

  // split / clean data
  val headerAndRows = csv.map(line => line.split(",").map(_.trim))
  // get header
  val header = headerAndRows.first
  // filter out header
  val data = headerAndRows.filter(_(0) != header(0))
  // splits to map (header/value pairs)
  val maps = data.map(splits => header.zip(splits).toMap)
  // filter out the 'om' user
  val result = maps.filter(map => map("user") != "om")
  // print result
  result.foreach(println)
}

Upvotes: 1

user3684014
user3684014

Reputation: 1175

I found out my mistake. It's not related to Spark/Scala. When I created the example csv, I use command in R:

df <- data.frame(user=c('om','daniel','3754978'),topic=c('scala','spark','spark'),hits=c(120,80,1))
write.csv(df, "df.csv",row.names=FALSE)

but write.csv will add " around factors by default, so that's why the map can't find key user because "user" is the real key, using

write.csv(df, "df.csv",quote=FALSE, row.names=FALSE)

will solve this problem.

Upvotes: 3

Related Questions