Reputation: 676
down below there is a map-reduce program counting words of several text files. My aim is to have the result in a descending order regarding the amount of appearences.
Unfortunately the program sorts the output lexicographically by the key. I want a natural order of the integer value.
So I added a custom comparator with job.setSortComparatorClass(IntComparator.class)
. But this doesn't work as expected. I'm getting the following exception:
java.lang.Exception: java.nio.BufferUnderflowException
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:404)
Caused by: java.nio.BufferUnderflowException
at java.nio.Buffer.nextGetIndex(Buffer.java:498)
at java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:355)
at WordCount$IntComparator.compare(WordCount.java:128)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.compare(MapTask.java:987)
at org.apache.hadoop.util.QuickSort.sortInternal(QuickSort.java:100)
at org.apache.hadoop.util.QuickSort.sort(QuickSort.java:64)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1277)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1174)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:609)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:675)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
at java.util.concurrent.FutureTask.run(FutureTask.java:166)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
Any help would be appreciated! :)
I've listed the whole program below as there may be a reason for the exception which I obviously don't know. As you can see I am using the new mapreduce api (org.apache.hadoop.mapreduce.*
).
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**
* Counts the words in several text files.
*/
public class WordCount {
/**
* Maps lines of text to (word, amount) pairs.
*/
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text word = new Text();
private IntWritable amount = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String textLine = value.toString();
StringTokenizer tokenizer = new StringTokenizer(textLine);
while (tokenizer.hasMoreElements()) {
word.set((String) tokenizer.nextElement());
context.write(word, amount);
}
}
}
/**
* Reduces (word, amount) pairs to (amount, word) list.
*/
public static class Reduce extends
Reducer<Text, IntWritable, IntWritable, Text> {
private IntWritable amount = new IntWritable();
private int sum;
@Override
protected void reduce(Text key, Iterable<IntWritable> valueList,
Context context) throws IOException, InterruptedException {
sum = 0;
for (IntWritable value : valueList) {
sum += value.get();
}
amount.set(sum);
context.write(amount, key);
}
}
public static class IntComparator extends WritableComparator {
public IntComparator() {
super(IntWritable.class);
}
private Integer int1;
private Integer int2;
@Override
public int compare(byte[] raw1, int offset1, int length1, byte[] raw2,
int offset2, int length2) {
int1 = ByteBuffer.wrap(raw1, offset1, length1).getInt();
int2 = ByteBuffer.wrap(raw2, offset2, length2).getInt();
return int2.compareTo(int1);
}
}
/**
* Job configuration.
*
* @param args
* @throws IOException
* @throws ClassNotFoundException
* @throws InterruptedException
*/
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
Path inputPath = new Path(args[0]);
Path outputPath = new Path(args[1]);
Configuration configuration = new Configuration();
configuration.addResource(new Path("/etc/hadoop/conf/core-site.xml"));
Job job = new Job(configuration);
job.setJobName("WordCount");
job.setJarByClass(WordCount.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setSortComparatorClass(IntComparator.class);
FileInputFormat.setInputPaths(job, inputPath);
FileSystem.get(configuration).delete(outputPath, true);
FileOutputFormat.setOutputPath(job, outputPath);
job.waitForCompletion(true);
}
}
Upvotes: 1
Views: 18696
Reputation: 11
Basically, you need sort by value. There are 2 ways to achieve this. But in short you need 2 map-reduce, i.e. run one more map reduce on the output of first Map reduce.
After completing normal map-reduce do one more map reduce where you take output of first map reduce as input to second map reduce. In second map reduce's map phase you can use a custom class as key e.g.
class WordCountVo implements WritableComparable<WordCountVo>
and you must override
public int compareTo(WordCountVo wodCountVo)
method.
In WordCountVO you can keep both word and count but compare based on count only. E.g. below are the member variables for WordCountVO
private String word;
private Long count;
Now when you receive key-value pairs in second reducer then your data will be all sorted by values. All you need to do is to write the key value pairs using context! Hope this helps.
Upvotes: 1
Reputation: 6686
As quetzalcoatl said Your comparator is not useful, Since it is used between Map and reduce phase and not after Reduce phase. So to accomplish this you need to either sort in cleanup
of Reducer
or write another program to sort output of reducer.
Upvotes: 1
Reputation: 3067
The comparator step occurs between the Mapper
and Reducer
, which wont work for you as you swap the key and value around in the Reducer
itself.
The default WritableComparator
would normally handle your numerical ordering if the key was IntWritable
, except it's getting a Text
key thus resulting in lexicographical ordering.
As to why exactly the output at the end isn't sorted by your written out IntWritable
key, I'm unsure. Perhaps it has something to do with the way TextOutputFormat
works? You might have to dig deeper into TextOutputFormat
source code for clues on that, but in short, setting the sort comparator probably won't help you here I'm afraid.
Upvotes: 2