Reputation: 129
I've been trying to implement the TfIdf algorithm using MapReduce in Hadoop. My TFIDF takes place in 4 steps (I call them MR1, MR2, MR3, MR4). Here are my input/outputs:
MR1: (offset, line) ==(Map)==> (word|file, 1) ==(Reduce)==> (word|file, n)
MR2: (word|file, n) ==(Map)==> (file, word|n) ==(Reduce)==> (word|file, n|N)
MR3: (word|file, n|N) ==(Map)==> (word, file|n|N|1) ==(Reduce)==> (word|file, n|N|M)
MR4: (word|file, n|N|M) ==(Map)==> (word|file, n/N log D/M)
Where n = number of (word, file) distinct pairs, N = number of words in each file, M = number of documents where each word appear, D = number of documents.
As of the MR1 phase, I'm getting the correct output, for example: hello|hdfs://..... 2
For the MR2 phase, I expect: hello|hdfs://....... 2|192
but I'm getting 2|hello|hdfs://...... 192|192
I'm pretty sure my code is correct, every time I try to add a string to my "value" in the reduce phase to see what's going on, the same string gets "teleported" in the key part.
Example: gg|word|hdfs://.... gg|192
Here is my MR1 code:
public class MR1 {
/* Classe Map :
* Entree : (offset, line)
* Sortie : (word|file, 1)
* Sends 1 for each word per line.
*/
static class MR1Mapper extends Mapper <LongWritable, Text, Text, IntWritable > {
public void map (LongWritable key, Text value, Context contexte)
throws IOException, InterruptedException {
// Recuperation du nom du fichier associe au "split"
FileSplit split = (FileSplit) contexte.getInputSplit();
String fileName = split.getPath().toString();
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line, "' \t:,;:!?./-_()[]{}\"&%<>");
while (tokenizer.hasMoreTokens()) {
String word = tokenizer.nextToken().toLowerCase();
contexte.write(new Text(word + "|" + fileName), new IntWritable(1));
}
}
}
/* Class Reducer : compte le nombre d'occurrence total par mot/fichier
* Entree : (word|file, x)
* Sortie : (word|file, n)
*/
public static class MR1Reducer extends Reducer <Text, IntWritable, Text, IntWritable > {
public void reduce(Text key, Iterable < IntWritable > values, Context contexte)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val:values) {
sum += val.get();
}
contexte.write(key, new IntWritable(sum));
}
}
public static void main(String args[]) throws Exception {
if (args.length != 2) {
System.err.println(args.length + "(" + args[0] + "," + args[1] + ")");
System.err. println("Usage : MR1 <source> <destination>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(MR1.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MR1Mapper.class);
job.setCombinerClass (MR1Reducer.class) ;
job.setReducerClass(MR1Reducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Here is my MR2 code:
public class MR2 {
/* Map : on isole le nom du fichier.
* Entree : (word|file, n)
* Sortie : (file, word|n)
*/
static class MR2Mapper extends Mapper <Text, Text, Text, Text> {
public void map (Text key, Text value, Context contexte)
throws IOException, InterruptedException {
String skey = key.toString () ;
String word = skey.substring (0, skey.indexOf ("|")) ;
String fileName = skey.substring (skey.indexOf ("|")+1) ;
contexte.write (new Text (fileName), new Text (word + "|" + value)) ;
}
}
/* Reduce : on somme le nombre d'occurence de chaque mot du fichier
* Entree : (file, word|n)
* Sortie : (word|file, n|N)
*/
public static class MR2Reducer extends Reducer <Text, Text, Text, Text> {
public void reduce (Text key, Iterable <Text> values, Context contexte)
throws IOException, InterruptedException {
int N = 0 ;
// les iterateurs sont utilisable qu'une seule fois. Donc il faut
// garder les valeurs dans une arraylist pour les reparcourir.
ArrayList <String> altmp = new ArrayList <String> () ;
// 1ere boucle : calcul de la somme totale des mots
for (Text val : values) {
String sval = val.toString () ;
String sn = sval.substring (sval.indexOf ("|")+1) ;
int n = Integer.parseInt (sn) ;
altmp.add (val.toString ()) ;
N += n ;
}
// 2eme boucle : calcul de la somme totale des mots
Iterator <String> it = altmp.iterator () ;
while (it.hasNext ()) {
String val = it.next () ;
String sval = val.toString () ;
String word = sval.substring (0, sval.indexOf ("|")) ;
String sn = sval.substring (sval.indexOf ("|")+1) ;
int n = Integer.parseInt (sn) ;
// I tried to replace n with "gg" here, still same teleporting issue
contexte.write (new Text (word + "|" + clef.toString ()), new Text (n + "|" + N)) ;
}
}
}
public static void main (String args []) throws Exception {
if (args.length != 2) {
System.err.println (args.length + "("+args [0] + "," +args [1] + ")") ;
System.err.println ("Usage : MR2 <source> <destination>") ;
System.exit (-1) ;
}
Job job = new Job () ;
job.setJarByClass (MR2.class) ;
// Le fichier HDFS a utiliser en entree
FileInputFormat.addInputPath (job, new Path (args [0])) ;
FileOutputFormat.setOutputPath (job, new Path (args [1])) ;
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setMapperClass (MR2Mapper.class) ;
job.setCombinerClass (MR2Reducer.class) ;
job.setReducerClass (MR2Reducer.class) ;
job.setMapOutputKeyClass (Text.class) ;
job.setMapOutputValueClass (Text.class) ;
job.setOutputKeyClass (Text.class) ;
job.setOutputValueClass (Text.class) ;
System.exit (job.waitForCompletion (true) ? 0 : 1) ;
}
}
Any help would be appreciated.
Upvotes: 1
Views: 84
Reputation: 1387
It's the Combiner's fault. You are specifying in the driver class that you want to use MR2Reducer
both as a Combiner and a Reducer in the following commands:
job.setCombinerClass (MR2Reducer.class) ;
job.setReducerClass (MR2Reducer.class) ;
However, a Combiner is running within the range of a Map
instance, while a Reducer is operating in series after the execution of all the Mappers. By using a Combiner, you are essentially putting MR2Reducer
to execute right after the execution of each individual Mapper task, so it calculates N
and splits the composite value of the given key-value input within each Mapper task range.
This basically results in the Reduce
phase kicking off by having input of the (word|file, n|N)
key-value pair schema (aka the output of a MR2Reducer
task before the Reduce
phase) instead of the desired (file, word|n)
schema. By unknowingly using the false schema, you falsely split the composite value and the output key-value pairs look wonky, wrong, and/or reverse.
To fix this, you can either:
MR2Reducer
, and then change your MR2Reducer
class to receive key-value pairs in the (word|file, n|N)
schema (not recommended, as it will probably negate all the benefits in terms of scalability and execution time, and will only make your MapReduce job more complicated that it can be), orjob.setCombinerClass (MR2Reducer.class) ;
line from your driver class to keep things simple and functional, so you can built from there in the future.To showcase this, I used your MR1
, MR2
classes locally on my machine, deleted the job.setCombinerClass (MR2Reducer.class) ;
line and used this input stored in HDFS to verify that the output key-value pairs are as desired. Here is a snippet of the output after the execution:
balance|hdfs://localhost:9000/user/crsl/metamorphosis/05.txt 1|661
suppress|hdfs://localhost:9000/user/crsl/metamorphosis/05.txt 1|661
back|hdfs://localhost:9000/user/crsl/metamorphosis/05.txt 4|661
after|hdfs://localhost:9000/user/crsl/metamorphosis/05.txt 1|661
suspicious|hdfs://localhost:9000/user/crsl/metamorphosis/05.txt 2|661
swang|hdfs://localhost:9000/user/crsl/metamorphosis/05.txt 2|661
swinging|hdfs://localhost:9000/user/crsl/metamorphosis/05.txt 1|661
Upvotes: 3