Reputation: 27544
Here is a log file:
2011-10-26 06:11:35 user1 210.77.23.12
2011-10-26 06:11:45 user2 210.77.23.17
2011-10-26 06:11:46 user3 210.77.23.12
2011-10-26 06:11:47 user2 210.77.23.89
2011-10-26 06:11:48 user2 210.77.23.12
2011-10-26 06:11:52 user3 210.77.23.12
2011-10-26 06:11:53 user2 210.77.23.12
...
I want to use MapReduce to sort by the number of logging times by the third filed(user) in descending order each line. In another word, I want the result to be displayed as:
user2 4
user3 2
user1 1
Now I have two questions:
By default, MapReduce will split the log file with space and carriage return, but I only need the third filed each line, that is, I don't care fields such as 2011-10-26
,06:11:35
, 210.77.23.12
, how to let MapReduce omit them and pick up the user filed?
By default, MapReduce will sort the result by the key instead of the value. How to let MapReduce to sort the result by value(logging times)?
Thank you.
Upvotes: 3
Views: 3494
Reputation: 692
For your first question:
You should probably pass the whole line to the mapper and just keep the third token for mapping and map (user
, 1) everytime.
public class AnalyzeLogs
{
public static class FindFriendMapper extends Mapper<Object, Text, Text, IntWritable> {
public void map(Object, Text value, Context context) throws IOException, InterruptedException
{
String tempStrings[] = value.toString().split(",");
context.write(new Text(tempStrings[2]), new IntWritable(1));
}
}
For your second question I believe you cannot avoid having a second MR Job after that (I cannot think of any other way). So the reducer of the first job will just aggregate the values and give a sum for each key, sorted by key. Which is not yet what you need.
So, you pass the output of this job as input to this second MR job. The objective of this job is to do a somewhat special sorting by value before passing to the reducers (which will do absolutely nothing).
Our Mapper for the second job will be the following:
public static class SortLogsMapper extends Mapper<Object, Text, Text, NullWritable> {
public void map(Object, Text value, Context context) throws IOException, InterruptedException
{
context.write(value, new NullWritable());
}
As you can see we do not use the value for this mapper at all. Instead, we have created a key that contains our value ( our key is in key1 value1
format).
What remains to be done now, is to specify to the framework that it should sort based on the value1
and not the whole key1 value1
. So we will implement a custom SortComparator
:
public static class LogDescComparator extends WritableComparator
{
protected LogDescComparator()
{
super(Text.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2)
{
Text t1 = (Text) w1;
Text t2 = (Text) w2;
String[] t1Items = t1.toString().split(" "); //probably it's a " "
String[] t2Items = t2.toString().split(" ");
String t1Value = t1Items[1];
String t2Value = t2Items[1];
int comp = t2Value.compareTo(t1Value); // We compare using "real" value part of our synthetic key in Descending order
return comp;
}
}
You can set your custom comparator as : job.setSortComparatorClass(LogDescComparator.class);
The reducer of the job should do nothing. However if we don't set a reducer the sorting for the mapper keys will not be done (and we need that). So, you need to set IdentityReducer
as a Reducer for your second MR job to do no reduction but still ensure that the mapper's synthetic keys are sorted in the way we specified.
Upvotes: 2