Reputation: 18575
The data looks like this, first field is a number,
3 ...
1 ...
2 ...
11 ...
And I want to sort these lines according to the first field numerically instead of alphabetically, which means after sorting it should look like this,
1 ...
2 ...
3 ...
11 ...
But hadoop keeps giving me this,
1 ...
11 ...
2 ...
3 ...
How do correct it?
Upvotes: 16
Views: 12081
Reputation: 5156
For streaming with order Hadoop (which may use -jobconf
instead of -D
for configuration), you can sort by key:
-jobconf stream.num.map.output.key.fields=2\
-jobconf mapreduce.partition.keycomparator.options="-k2,2nr"\
-jobconf mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator
By stream.num.map.output.key.fields
, 1st and 2nd columns are key 1
and key 2
.
mapreduce.partition.keycomparator.options="-k2,2nr"
means sorting in reverse order by using 2nd key (from 2nd to 2nd keys) as numeric value.
It is pretty much like Linux sort
command!
Upvotes: 1
Reputation: 744
Assuming you are using Hadoop Streaming, you need to use the KeyFieldBasedComparator class.
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator should be added to streaming command
You need to provide type of sorting required using mapred.text.key.comparator.options. Some useful ones are -n : numeric sort, -r : reverse sort
EXAMPLE :
Create an identity mapper and reducer with the following code
This is the mapper.py & reducer.py
#!/usr/bin/env python
import sys
for line in sys.stdin:
print "%s" % (line.strip())
This is the input.txt
1
11
2
20
7
3
40
This is the Streaming command
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator
-D mapred.text.key.comparator.options=-n
-input /user/input.txt
-output /user/output.txt
-file ~/mapper.py
-mapper ~/mapper.py
-file ~/reducer.py
-reducer ~/reducer.py
And you will get the required output
1
2
3
7
11
20
40
NOTE :
I have used a simple one key input. If however you have multiple keys and/or partitions, you will have to edit mapred.text.key.comparator.options as needed. Since I do not know your use case , my example is limited to this
Identity mapper is needed since you will need atleast one mapper for a MR job to run.
Identity reducer is needed since shuffle/sort phase will not work if it is a pure map only job.
Upvotes: 26
Reputation: 10650
Hadoop's default comparator compares your keys based on the Writable
type (more precisely WritableComparable
) you use. If you are dealing with IntWritable
or LongWritable
then it will sort them numerically.
I assume you are using Text
in your example therefore you'll end up having natural sort order.
In special cases, however, you can also write your own comparator.
E.g: for testing purposes only, here's a quick sample how to change the sort order of Text keys: this will treat them as integers and will produce numerical sort order:
public class MyComparator extends WritableComparator {
public MyComparator() {
super(Text.class);
}
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
try {
String v1 = Text.decode(b1, s1, l1);
String v2 = Text.decode(b2, s2, l2);
int v1Int = Integer.valueOf(v1.trim());
int v2Int = Integer.valueOf(v2.trim());
return (v1Int < v2Int) ? -1 : ((v1Int > v2Int) ? 1 : 0);
}
catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
}
In the jobrunner class set:
Job job = new Job();
...
job.setSortComparatorClass(MyComparator.class);
Upvotes: 7