Reputation: 27
I am currently working on a Hadoop project in Java. My objective is to make a map reduce that counts the line frequency of every word. As in, not outputting the exact amount of times a word is counted in the input file, but just counting how many lines it occurs in. If a word occurs in a line more than once, it should only be counted once because we are only counting how many lines it occurs in. I have a basic map reduce working that I will post, but I am a little lost on how to only count the line frequency of words instead of the full word count. Any help would be appreciated, thanks a lot.
MapWordCount
public class MapWordCount extends Mapper <LongWritable, Text, Text, IntWritable>
{
private Text wordToken = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
StringTokenizer tokens = new StringTokenizer(value.toString(), "[_|$#0123456789<>\\^=\\[\\]\\*/\\\\,;,.\\-:()?!\"']"); //Dividing String into tokens
while (tokens.hasMoreTokens())
{
wordToken.set(tokens.nextToken());
context.write(wordToken, new IntWritable(1));
}
}
}
ReduceWordCount
public class ReduceWordCount extends Reducer <Text, IntWritable, Text, IntWritable>
{
private IntWritable count = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int valueSum = 0;
for (IntWritable val : values)
{
valueSum += val.get();
}
count.set(valueSum);
context.write(key, count);
}
}
Driver Code
public class WordCount {
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
String[] pathArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (pathArgs.length < 2)
{
System.err.println("MR Project Usage: wordcount <input-path> [...] <output-path>");
System.exit(2);
}
Job wcJob = Job.getInstance(conf, "MapReduce WordCount");
wcJob.setJarByClass(WordCount.class);
wcJob.setMapperClass(MapWordCount.class);
wcJob.setCombinerClass(ReduceWordCount.class);
wcJob.setReducerClass(ReduceWordCount.class);
wcJob.setOutputKeyClass(Text.class);
wcJob.setOutputValueClass(IntWritable.class);
for (int i = 0; i < pathArgs.length - 1; ++i)
{
FileInputFormat.addInputPath(wcJob, new Path(pathArgs[i]));
}
FileOutputFormat.setOutputPath(wcJob, new Path(pathArgs[pathArgs.length - 1]));
System.exit(wcJob.waitForCompletion(true) ? 0 : 1);
}
}
Upvotes: 0
Views: 1515
Reputation: 1387
Things are surprisingly simple in this use case of Hadoop's MapReduce, because Hadoop tends to read the input documents line-by-line even with FileInputFormat
being explicitly specified for the format of the input data of a MR job (this goes far beyond the scope of your question, but you can check out about map and file splits in Hadoop here and here).
Since each mapper instance is going to have a line as its input, the only thing you have to worry about is:
1. splitting the text into words (after cleaning them up from punctuation, loose spaces, turning all of them to lowercase, etc.),
2. getting rid of the duplicates to end up with just the unique words of said line,
3. and signing every unique word as key with 1
as value, classic WordCount style.
For 2. you can use a HashSet
which (as you'd expect) is a Java data structure that only keeps unique elements while ignoring duplicates, to load every token to it and then iterate it to write the key-value pairs and send them to the reducer instances.
This type of application can look like this (I changed the way you tokenize the text in the Map
function because it didn't seem to split each word but just split between punctuation symbols):
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
import java.util.*;
public class LineFreq
{
public static class MapWordCount extends Mapper <LongWritable, Text, Text, IntWritable>
{
private Text wordToken = new Text();
private static final IntWritable one = new IntWritable(1);
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
// dividing String into tokens
String[] tokens = value.toString()
.replaceAll("\\d+", "") // get rid of numbers...
.replaceAll("[^a-zA-Z ]", " ") // get rid of punctuation...
.toLowerCase() // turn every letter to lowercase...
.trim() // trim the spaces...
.replaceAll("\\s+", " ")
.split(" ");
Set<String> word_set = new HashSet<String>(); // set to hold all of the unique words (WITHOUT DUPLICATES)
// add words to word set
for(String word : tokens)
word_set.add(word);
// write each unique word to have one occurrence in this particular line
for(String word : word_set)
{
wordToken.set(word);
context.write(wordToken, one);
}
}
}
public static class ReduceWordCount extends Reducer <Text, IntWritable, Text, IntWritable>
{
private IntWritable count = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int valueSum = 0;
for (IntWritable val : values)
valueSum += val.get();
count.set(valueSum);
context.write(key, count);
}
}
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
String[] pathArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (pathArgs.length < 2)
{
System.err.println("MR Project Usage: wordcount <input-path> [...] <output-path>");
System.exit(2);
}
Job wcJob = Job.getInstance(conf, "MapReduce WordCount");
wcJob.setJarByClass(LineFreq.class);
wcJob.setMapperClass(MapWordCount.class);
wcJob.setCombinerClass(ReduceWordCount.class);
wcJob.setReducerClass(ReduceWordCount.class);
wcJob.setOutputKeyClass(Text.class);
wcJob.setOutputValueClass(IntWritable.class);
for (int i = 0; i < pathArgs.length - 1; ++i)
{
FileInputFormat.addInputPath(wcJob, new Path(pathArgs[i]));
}
FileOutputFormat.setOutputPath(wcJob, new Path(pathArgs[pathArgs.length - 1]));
System.exit(wcJob.waitForCompletion(true) ? 0 : 1);
}
}
So we can test it out with the following document as input:
hello world! hello! how are you, world?
i am fine! world world world! hello to you too!
what a wonderful world!
amazing world i must say, indeed
And confirm that words frequency is indeed being computer line-wise with the following output:
Upvotes: 0