Reputation: 135
I was trying to do a movie recommendation system and have been following this website. LinkHere
def count_ratings_users_freq(self, user_id, values):
"""
For each user, emit a row containing their "postings"
(item,rating pairs)
Also emit user rating sum and count for use later steps.
output:
userid, number of movie rated by user, rating number count, (movieid, movie rating)
17 1,3,(70,3)
35 1,1,(21,1)
49 3,7,(19,2 21,1 70,4)
87 2,3,(19,1 21,2)
98 1,2,(19,2)
"""
item_count = 0
item_sum = 0
final = []
for item_id, rating in values:
item_count += 1
item_sum += rating
final.append((item_id, rating))
yield user_id, (item_count, item_sum, final)
Is it possible to translate the above code to Java with Hadoop Map and Reduce?
userid
as key
no. movie rated by user, rating number count, (movieid, movie ratings)
as value.
Thank you!
Upvotes: 0
Views: 7193
Reputation: 6343
Yes, you can convert this into a map reduce program.
The mapper logic:
The reducer logic:
For each value, you need parse the value and get the "movie rating". For e.g for value (70,3), you will parse the movie rating = 3.
For each valid record, you will increment movieCount. You will add the parsed "movie rating" to "movieRatingCount" and append the value to "movieValues" string.
You will get the desired output.
Following is the piece of code, which achieves this:
package com.myorg.hadooptests;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class MovieRatings {
public static class MovieRatingsMapper
extends Mapper<LongWritable, Text , IntWritable, Text>{
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String valueStr = value.toString();
int index = valueStr.indexOf(',');
if(index != -1) {
try
{
IntWritable keyUserID = new IntWritable(Integer.parseInt(valueStr.substring(0, index)));
context.write(keyUserID, new Text(valueStr.substring(index + 1)));
}
catch(Exception e)
{
// You could get a NumberFormatException
}
}
}
}
public static class MovieRatingsReducer
extends Reducer<IntWritable, Text, IntWritable, Text> {
public void reduce(IntWritable key, Iterable<Text> values,
Context context) throws IOException, InterruptedException {
int movieCount = 0;
int movieRatingCount = 0;
String movieValues = "";
for (Text value : values) {
String[] tokens = value.toString().split(",");
if(tokens.length == 2)
{
movieRatingCount += Integer.parseInt(tokens[1].trim()); // You could get a NumberFormatException
movieCount++;
movieValues = movieValues.concat(value.toString() + " ");
}
}
context.write(key, new Text(Integer.toString(movieCount) + "," + Integer.toString(movieRatingCount) + ",(" + movieValues.trim() + ")"));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "CompositeKeyExample");
job.setJarByClass(MovieRatings.class);
job.setMapperClass(MovieRatingsMapper.class);
job.setReducerClass(MovieRatingsReducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path("/in/in2.txt"));
FileOutputFormat.setOutputPath(job, new Path("/out/"));
System.exit(job.waitForCompletion(true) ? 0:1);
}
}
For the input:
17,70,3
35,21,1
49,19,2
49,21,1
49,70,4
87,19,1
87,21,2
98,19,2
I got the output:
17 1,3,(70,3)
35 1,1,(21,1)
49 3,7,(70,4 21,1 19,2)
87 2,3,(21,2 19,1)
98 1,2,(19,2)
Upvotes: 5