Reputation: 1715
I have a map-reduce job in which the keys are numbers from 1-200. My intended output was (number,value) in the number order. But I'm getting the output as :
1 value
10 value
11 value
:
:
2 value
20 value
:
:
3 value
I know this is due to the default behavior of Map-Reduce to sort keys in ascending order.
I want my keys to be sorted in numerical order only. How can I achieve this?
Upvotes: 0
Views: 1339
Reputation: 8032
The default WritableComparator
in MapReduce framework would normally handle your numerical ordering if the key was IntWritable
. I suspect it's getting a Text
key thus resulting in lexicographical ordering in your case. Please have a look at the sample code which uses IntWritable
key to emit the values:
1) Mapper Implementaion
package com.stackoverflow.answers.mapreduce;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class SourceFileMapper extends Mapper<LongWritable, Text, IntWritable, Text> {
private static final String DEFAULT_DELIMITER = "\t";
private IntWritable keyToEmit = new IntWritable();
private Text valueToEmit = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
keyToEmit.set(Integer.parseInt(line.split(DEFAULT_DELIMITER)[0]));
valueToEmit.set(line.split(DEFAULT_DELIMITER)[1]);
context.write(keyToEmit, valueToEmit);
}
}
2) Reducer Implementation
package com.stackoverflow.answers.mapreduce;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class SourceFileReducer extends Reducer<IntWritable, Text, IntWritable, Text> {
public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException,
InterruptedException {
for (Text value : values) {
context.write(key, value);
}
}
}
3) Driver Implementation
package com.stackoverflow.answers.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class SourceFileDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Path inputPath = new Path(args[0]);
Path outputDir = new Path(args[1]);
// Create configuration
Configuration conf = new Configuration(true);
// Create job
Job job = new Job(conf, "SourceFileDriver");
job.setJarByClass(SourceFileDriver.class);
// Setup MapReduce
job.setMapperClass(SourceFileMapper.class);
job.setReducerClass(SourceFileReducer.class);
job.setNumReduceTasks(1);
// Specify key / value
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
// Input
FileInputFormat.addInputPath(job, inputPath);
job.setInputFormatClass(TextInputFormat.class);
// Output
FileOutputFormat.setOutputPath(job, outputDir);
job.setOutputFormatClass(TextOutputFormat.class);
// Delete output if exists
FileSystem hdfs = FileSystem.get(conf);
if (hdfs.exists(outputDir))
hdfs.delete(outputDir, true);
// Execute job
int code = job.waitForCompletion(true) ? 0 : 1;
System.exit(code);
}
}
Thank you!
Upvotes: 1
Reputation: 39913
If I had to take a guess, I'd say that you are storing your numbers as Text objects and not IntWritable objects.
Either way, once you have more than one reducer, only the items within a reducer will be sorted, but it won't be totally sorted.
Upvotes: 3