Reputation: 11
I'm new to Hadoop and done with a typical "count the IP addresses in a log" exercise. Now I'm trying to sort the output by running a second MapReduce job immediately after the first. Almost everything is working, except for the fact that the output collector isn't quite processing the sort the way I'd like. Here's a snippet of my output:
-101 71.59.196.132
-115 59.103.11.163
-1175 59.93.51.231
-119 127.0.0.1
-1193 115.186.128.19
-1242 59.93.64.161
-146 192.35.79.70
I can't figure out why, for example, 1175 is considered a lower value than 119. I've tried playing around with Comparators, but it hasn't had any positive effect.
The Map and Reduce jobs for the data collection are both standard and non-problematic. They output a list much like the snippet above, but completely unsorted. The SortMap, SortReduce, and Runner classes are a little different. Here's my Runner class:
public class Runner {
public static void main(String[] args) throws Exception
{
JobConf conf = new JobConf(Runner.class);
JobConf sortStage = new JobConf(Runner.class);
conf.setJobName("ip-count");
conf.setMapperClass(IpMapper.class);
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(IntWritable.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setReducerClass(IpReducer.class);
conf.setOutputValueGroupingComparator(IntWritable.Comparator.class);
//Input and output from command line...
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
sortStage.setJobName("sort-stage");
sortStage.setMapperClass(SortMapper.class);
sortStage.setMapOutputKeyClass(Text.class);
sortStage.setMapOutputValueClass(IntWritable.class);
sortStage.setReducerClass(SortReducer.class);
sortStage.setOutputKeyClass(IntWritable.class);
sortStage.setOutputValueClass(IntWritable.class);
//Input and output from command line...
FileInputFormat.setInputPaths(sortStage, new Path(args[2]));
FileOutputFormat.setOutputPath(sortStage, new Path(args[3]));
JobClient.runJob(conf);
JobClient.runJob(sortStage);
}
}
The "SortMapper":
public class SortMapper extends MapReduceBase
implements Mapper<LongWritable, Text, Text, IntWritable>
{
private static final IntWritable one = new IntWritable(1);
public void map(LongWritable fileOffset, Text lineContents,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
{
//Grab the whole string, formatted as (Count /t IP), e.g., 101 128.10.3.40
String ip = lineContents.toString();
//Output it with a count of 1
output.collect(new Text(ip), one);
}
}
}
The "SortReducer":
public class SortReducer extends MapReduceBase implements Reducer<Text, IntWritable,
IntWritable, Text>
{
public void reduce(Text ip, Iterator<IntWritable> counts,
OutputCollector<IntWritable, Text> output, Reporter reporter)
throws IOException{
String delimiter = "[\t]";
String[] splitString = ip.toString().split(delimiter);
//Count represented as 0-count to easily sort in descending order vs. ascending
int sortCount = 0-Integer.parseInt(splitString[0]);
output.collect(new IntWritable(sortCount), new Text(splitString[1]));
}
}
This is just a single-node job, so I don't think partitioning is a factor. Sorry if this is a trivial matter - I've spent an embarrassing amount of time on the problem and couldn't find anything that dealt with this particular sorting issue. Any advice would be greatly appreciated!
Upvotes: 1
Views: 168
Reputation: 5647
Your numbers are being compared 'alphabetically'. This is because there are strings. If you imagine alphabetical sorting, aabc comes before aac. If you turn these into numbers, 1123 comes before 113.
If you want numeric comparison, you are going to have to convert them to integers.
Upvotes: 1