Widget
Widget

Reputation: 27

Map Reduce Line Frequency of Words

I am currently working on a Hadoop project in Java. My objective is to make a map reduce that counts the line frequency of every word. As in, not outputting the exact amount of times a word is counted in the input file, but just counting how many lines it occurs in. If a word occurs in a line more than once, it should only be counted once because we are only counting how many lines it occurs in. I have a basic map reduce working that I will post, but I am a little lost on how to only count the line frequency of words instead of the full word count. Any help would be appreciated, thanks a lot.

MapWordCount

public class MapWordCount extends Mapper <LongWritable, Text, Text, IntWritable>
{
      private Text wordToken = new Text();
      public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
      {
          StringTokenizer tokens = new StringTokenizer(value.toString(), "[_|$#0123456789<>\\^=\\[\\]\\*/\\\\,;,.\\-:()?!\"']"); //Dividing String into tokens
        while (tokens.hasMoreTokens())
        {
          wordToken.set(tokens.nextToken());
          context.write(wordToken, new IntWritable(1));
        }
      }
    }

ReduceWordCount

public class ReduceWordCount extends Reducer <Text, IntWritable, Text, IntWritable>
{
      private IntWritable count = new IntWritable();
      public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
      {
        int valueSum = 0;
        for (IntWritable val : values)
        {
          valueSum += val.get();
        }
        count.set(valueSum);
        context.write(key, count);
      }
    }

Driver Code

public class WordCount {
      public static void main(String[] args) throws Exception
      {
        Configuration conf = new Configuration();
        String[] pathArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (pathArgs.length < 2)
        {
          System.err.println("MR Project Usage: wordcount <input-path> [...] <output-path>");
          System.exit(2);
        }
        Job wcJob = Job.getInstance(conf, "MapReduce WordCount");
        wcJob.setJarByClass(WordCount.class);
        wcJob.setMapperClass(MapWordCount.class);
        wcJob.setCombinerClass(ReduceWordCount.class);
        wcJob.setReducerClass(ReduceWordCount.class);
        wcJob.setOutputKeyClass(Text.class);
        wcJob.setOutputValueClass(IntWritable.class);
        for (int i = 0; i < pathArgs.length - 1; ++i)
        {
          FileInputFormat.addInputPath(wcJob, new Path(pathArgs[i]));
        }
        FileOutputFormat.setOutputPath(wcJob, new Path(pathArgs[pathArgs.length - 1]));
        System.exit(wcJob.waitForCompletion(true) ? 0 : 1);
      }
    }

Upvotes: 0

Views: 1515

Answers (1)

Coursal
Coursal

Reputation: 1387

Things are surprisingly simple in this use case of Hadoop's MapReduce, because Hadoop tends to read the input documents line-by-line even with FileInputFormat being explicitly specified for the format of the input data of a MR job (this goes far beyond the scope of your question, but you can check out about map and file splits in Hadoop here and here).

Since each mapper instance is going to have a line as its input, the only thing you have to worry about is:

1. splitting the text into words (after cleaning them up from punctuation, loose spaces, turning all of them to lowercase, etc.),

2. getting rid of the duplicates to end up with just the unique words of said line,

3. and signing every unique word as key with 1 as value, classic WordCount style.

For 2. you can use a HashSet which (as you'd expect) is a Java data structure that only keeps unique elements while ignoring duplicates, to load every token to it and then iterate it to write the key-value pairs and send them to the reducer instances.

This type of application can look like this (I changed the way you tokenize the text in the Map function because it didn't seem to split each word but just split between punctuation symbols):

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;
import java.util.*;

public class LineFreq
{
    public static class MapWordCount extends Mapper <LongWritable, Text, Text, IntWritable>
    {
        private Text wordToken = new Text();
        private static final IntWritable one = new IntWritable(1);

        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
        {
            // dividing String into tokens
            String[] tokens = value.toString()
                                .replaceAll("\\d+", "")           // get rid of numbers...
                                .replaceAll("[^a-zA-Z ]", " ")    // get rid of punctuation...
                                .toLowerCase()                                      // turn every letter to lowercase...
                                .trim()                                             // trim the spaces...
                                .replaceAll("\\s+", " ")
                                .split(" ");

            Set<String> word_set = new HashSet<String>();   // set to hold all of the unique words (WITHOUT DUPLICATES)

            // add words to word set
            for(String word : tokens)
                word_set.add(word);

            // write each unique word to have one occurrence in this particular line
            for(String word : word_set)
            {
                wordToken.set(word);
                context.write(wordToken, one);
            }

        }
    }

    public static class ReduceWordCount extends Reducer <Text, IntWritable, Text, IntWritable>
    {
        private IntWritable count = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
        {
            int valueSum = 0;

            for (IntWritable val : values)
              valueSum += val.get();

            count.set(valueSum);
            context.write(key, count);
        }
    }

    public static void main(String[] args) throws Exception
    {
        Configuration conf = new Configuration();
        String[] pathArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        if (pathArgs.length < 2)
        {
            System.err.println("MR Project Usage: wordcount <input-path> [...] <output-path>");
            System.exit(2);
        }

        Job wcJob = Job.getInstance(conf, "MapReduce WordCount");
        wcJob.setJarByClass(LineFreq.class);
        wcJob.setMapperClass(MapWordCount.class);
        wcJob.setCombinerClass(ReduceWordCount.class);
        wcJob.setReducerClass(ReduceWordCount.class);
        wcJob.setOutputKeyClass(Text.class);
        wcJob.setOutputValueClass(IntWritable.class);
        for (int i = 0; i < pathArgs.length - 1; ++i)
        {
            FileInputFormat.addInputPath(wcJob, new Path(pathArgs[i]));
        }
        FileOutputFormat.setOutputPath(wcJob, new Path(pathArgs[pathArgs.length - 1]));
        System.exit(wcJob.waitForCompletion(true) ? 0 : 1);
    }
}

So we can test it out with the following document as input:

hello world! hello! how are you, world?
i am fine! world world world! hello to you too!
what a wonderful world!
amazing world i must say, indeed

And confirm that words frequency is indeed being computer line-wise with the following output:

enter image description here

Upvotes: 0

Related Questions