Reputation: 190
The following is the input file to Apache spark processing. each record contains four fields as shown.
User ID Movie ID Rating Date
6 9217 2 2005-04-28 00:00:00.000
6 9254 4 2005-04-20 00:00:00.000
6 9330 4 2004-11-17 00:00:00.000
6 9330 5 2004-09-27 00:00:00.000
6 1615 4 2004-09-15 00:00:00.000
6 1659 3 2005-02-18 00:00:00.000
6 9254 4 2005-10-26 00:00:00.000
6 9217 3 2005-11-25 00:00:00.000
6 9217 3 2004-09-15 00:00:00.000
from each of the records, i need to extract Movie ID
and Rating
into key/value pairs as ex: (Movie ID,Rating )
. i went though the documentation of Apache spark and done some surfing. But couldn't found the relevant stuff. Any suggestions would be appreciated.
Upvotes: 0
Views: 423
Reputation: 4667
You can do something like this:
val text= sc.textFile("movies.txt")
val LogEntry = """^\s*([0-9]+)\s+([0-9]+)\s+(\d{1})\s+(.*)""".r
val logEntries = text.flatMap(line => line match {
case LogEntry(userid,movieid,rating,date) => Some((userid.toInt,movieid.toInt,rating.toInt,date))
case _ => None
}).cache()
val movieTotalRating = logEntries.map(line => line match { case (userid,movieid,rating,date) => (movieid,rating)}).reduceByKey((rating1, rating2) => rating1 + rating2)
Notice, I use the .flatMap
on the Optional
return type to remove rows which are not matching the RegEx (like the header row)
By caching the logEntries
you can compute several statistics (like movieTotalRating here) from the same parsed dataset.
Upvotes: 2
Reputation: 190
I could do it on sample row using pattern matching. But not sure about how to apply to all the lines of input file to generate desired key/value pair (movie Id, Rating)
.
scala> val LogEntry = """^\s*([0-9]+)\s+([0-9]+)\s+(\d{1})\s+(.*)""".r
LogEntry: scala.util.matching.Regex = ^\s*([0-9]+)\s+([0-9]+)\s+(\d{1})\s+(.*)
scala> val LogEntry(userid,movieid,rating,date) = "6 9217 2 2005-04-28 00:00:00.000"
userid: String = 6
movieid: String = 9217
rating: String = 2
date: String = 2005-04-28 00:00:00.000
Upvotes: 0