Reputation: 7995
I have a simple MapReduce job which is supposed to read a dictionary from a text file and them process another huge file line by line and compute the inverse document matrix. The output is supposed to look like this:
word-id1 docX:tfX docY:tfY
word-id2 docX:tfX docY:tfY etc...
However, the output of the reducer is emitted only in one huuuge line. I do not understand why since it should emit a new line for each word-id
(which is the key for the reducer).
The mapper produces the correct output (pair of word-id
and values of doc-id:tf
on separate lines). I tested that without the reducer. The reducer is supposed to just append the values corresponding to the same key on one line for each key.
Could you please take a look on my code (specifically on the reducer and configuration of the job) and tell me why does the reducer emit only one huge line instead of multiple lines corresponding to the specified keys? I spent many hours debugging this and cannot make my head around it.
public class Indexer extends Configured implements Tool {
* Vocabulary: key = term, value = index
private static Map<String, Integer> vocab = new HashMap<String, Integer>();
public static void main(String[] arguments) throws Exception {
System.exit( Indexer(), arguments));
public static class Comparator extends WritableComparator {
protected Comparator() {
super(Text.class, true);
public int compare(WritableComparable a, WritableComparable b) {
// Here we use exploit the implementation of compareTo(...) in
// Text.class.
return -a.compareTo(b);
public static class IndexerMapper extends
Mapper<Object, Text, IntWritable, Text> {
private Text result = new Text();
// load vocab from distributed cache
public void setup(Context context) throws IOException {
Configuration conf = context.getConfiguration();
FileSystem fs = FileSystem.get(conf);
URI[] cacheFiles = DistributedCache.getCacheFiles(conf);
Path getPath = new Path(cacheFiles[0].getPath());
BufferedReader bf = new BufferedReader(new InputStreamReader(;
String line = null;
while ((line = bf.readLine()) != null) {
StringTokenizer st = new StringTokenizer(line, " \t");
int index = Integer.parseInt(st.nextToken()); // first token is the line number - term id
String word = st.nextToken(); // second element is the term
// save vocab
vocab.put(word, index);
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
// init TF map
Map<String, Integer> mapTF = new HashMap<String, Integer>();
// parse input string
StringTokenizer st = new StringTokenizer(value.toString(), " \t");
// first element is doc index
int index = Integer.parseInt(st.nextToken());
// count term frequencies
String word;
while (st.hasMoreTokens()) {
word = st.nextToken();
// check if word is in the vocabulary
if (vocab.containsKey(word)) {
if (mapTF.containsKey(word)) {
int count = mapTF.get(word);
mapTF.put(word, count + 1);
} else {
mapTF.put(word, 1);
// compute TF-IDF
int wordIndex;
for (String term : mapTF.keySet()) {
int tf = mapTF.get(term);
if (vocab.containsKey(term)) {
wordIndex = vocab.get(term);
context.write(new IntWritable(wordIndex), new Text(index + ":" + tf));
public static class IndexerReducer extends Reducer<IntWritable, Text, IntWritable, Text>
public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException
StringBuilder sb = new StringBuilder(16000);
for (Text value : values)
sb.append(value.toString() + " ");
context.write(key, new Text(sb.toString()));
* This is where the MapReduce job is configured and being launched.
public int run(String[] arguments) throws Exception {
ArgumentParser parser = new ArgumentParser("TextPreprocessor");
parser.addArgument("input", true, true, "specify input directory");
parser.addArgument("output", true, true, "specify output directory");
Path inputPath = new Path(parser.getString("input"));
Path outputDir = new Path(parser.getString("output"));
// Create configuration.
Configuration conf = getConf();
// add distributed file with vocabulary
.addCacheFile(new URI("/user/myslima3/vocab.txt"), conf);
// Create job.
Job job = new Job(conf, "WordCount");
// Setup MapReduce.
// Sort the output words in reversed order.
// Specify (key, value).
// Input.
FileInputFormat.addInputPath(job, inputPath);
// Output.
FileOutputFormat.setOutputPath(job, outputDir);
FileSystem hdfs = FileSystem.get(conf);
// Delete output directory (if exists).
if (hdfs.exists(outputDir))
hdfs.delete(outputDir, true);
// Execute the job.
return job.waitForCompletion(true) ? 0 : 1;
Upvotes: 0
Views: 1481
Reputation: 1811
Try these to debug your issue -
Upvotes: 1