user2875021
user2875021

Reputation: 135

Java Hadoop MapReduce Multiple Keys Values

I am implementing a movie recommendation system in Java and have been following this website Link Here

Input: userId movieRatingCount,ratingSum,(movieId,movieRating)

17    1,3,(70,3)
35    1,1,(21,1)
49    3,7,(19,2 21,1 70,4)
87    2,3,(19,1 21,2)
98    1,2,(19,2)

the code:

def pairwise_items(self, user_id, values):
    item_count, item_sum, ratings = values
    #print item_count, item_sum, [r for r in combinations(ratings, 2)]
    #bottleneck at combinations
    for item1, item2 in combinations(ratings, 2):
        yield (item1[0], item2[0]), \
                (item1[1], item2[1])

Output: firstMovieId, secondMovieId firstRating,secondRating

19,21  2,1
19,70  2,4
21,70  1,4
19,21  1,2

For example, for userId 49, he watched 3 movies. The output will be

firstMovie, secondMovie firstMovieRatings, secondMovieRatings
firstMovie, thirdMovie firstMovieRatings, thirdMovieRatings
secondMovie, thirdMovie secondMovieRatings, thirdMovieRatings

For user who watched 1 movie, that output will be skipped.

Is it possible to translate this python code to Java? I have no clue what the map output key and value will be. As well as the way to tackle this problem. Thank you in advance!

Upvotes: 1

Views: 4657

Answers (1)

Manjunath Ballur
Manjunath Ballur

Reputation: 6343

Mapper Logic:

  1. Assumes that input has key/values as tab separated. For e.g. "49 3,7,(19,2 21,1 70,4)"
  2. In the value, it searches for "(" and parses out the string between "(" and ")"
  3. It emits (key,value) as (UserId, (movieId,movieRating)). For e.g. for the record "49 3,7,(19,2 21,1 70,4)", it emits key:49, value: 19,2 21,1 70,4

Reducer Logic:

  1. It splits the value on blank (" "). For e.g. it splits "19,2 21,1 70,4" into 3 strings: "19,2", "21,1" and "70,4". These values are added to an ArrayList

  2. All the 2-way combinations for these values are computed.

  3. Finally these combinations are emitted to output.

Following is the code:

package com.myorg.hadooptests;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class MovieGroupings {

    public static class MovieGroupingsMapper
            extends Mapper<LongWritable, Text , Text, Text>{

        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String valueStr = value.toString().trim();
            String[] tokens = valueStr.split("\t"); // Assume key/values to be tab seperated. For e.g. "17    1,3,(70,3)"

            if(tokens.length == 2) {
                int index = tokens[1].indexOf('('); // Search for "(" character
                if(index != -1)
                {
                    context.write(new Text(tokens[0]), new Text(tokens[1].substring(index+1, tokens[1].length() - 1)));  // Exclude '(' and ')'
                }
            }
        }
    }

    public static class MovieGroupingsReducer
            extends Reducer<Text, Text, Text, Text> {

        public void reduce(Text key, Iterable<Text> values,
                           Context context) throws IOException, InterruptedException {

            for (Text value : values) {
                String[] tokens = value.toString().split(" "); // Split the values based on blank character

                if(tokens.length >= 2) // Ignore if there is only one movie
                {
                    for(int i = 0; i < tokens.length; i++)
                        for(int j = i + 1; j < tokens.length; j++) {
                            String groupings = tokens[i] + "," + tokens[j]; // Add 2 movies with ",". For e.g. "19,2,21,1"
                            String[] moviesAndRatings = groupings.split(",");
                            if (moviesAndRatings.length == 4)
                                context.write(new Text(moviesAndRatings[0] + "," + moviesAndRatings[2]),
                                        new Text(moviesAndRatings[1] + "," + moviesAndRatings[3]));
                        }
                }
            }
        }
    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf, "MovieGroupings");
        job.setJarByClass(MovieGroupings.class);
        job.setMapperClass(MovieGroupingsMapper.class);
        job.setReducerClass(MovieGroupingsReducer.class);
        job.setNumReduceTasks(5);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path("/in/in5.txt"));
        FileOutputFormat.setOutputPath(job, new Path("/out/"));

        System.exit(job.waitForCompletion(true) ? 0:1);
    }
}

For the following input:

17      1,3,(70,3)
35      1,1,(21,1)
49      3,7,(19,2 21,1 70,4)
87      2,3,(19,1 21,2)
98      1,2,(19,2)

Output generated is:

19,21   2,1
19,70   2,4
21,70   1,4
19,21   1,2

Upvotes: 3

Related Questions