Reputation: 31252
Consider this class
: (From Hadoop: The definitive guide
3rd edition):
import java.io.*;
import org.apache.hadoop.io.*;
public class TextPair implements WritableComparable<TextPair> {
private Text first;
private Text second;
public TextPair() {
set(new Text(), new Text());
}
public TextPair(String first, String second) {
set(new Text(first), new Text(second));
}
public TextPair(Text first, Text second) {
set(first, second);
}
public void set(Text first, Text second) {
this.first = first;
this.second = second;
}
public Text getFirst() {
return first;
}
public Text getSecond() {
return second;
}
@Override
public void write(DataOutput out) throws IOException {
first.write(out);
second.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
first.readFields(in);
second.readFields(in);
}
@Override
public int hashCode() {
return first.hashCode() * 163 + second.hashCode();
}
@Override
public boolean equals(Object o) {
if (o instanceof TextPair) {
TextPair tp = (TextPair) o;
return first.equals(tp.first) && second.equals(tp.second);
}
return false;
}
@Override
public String toString() {
return first + "\t" + second;
}
@Override
public int compareTo(TextPair tp) {
int cmp = first.compareTo(tp.first);
if (cmp != 0) {
return cmp;
}
return second.compareTo(tp.second);
}
// ^^ TextPair
// vv TextPairComparator
public static class Comparator extends WritableComparator {
private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
public Comparator() {
super(TextPair.class);
}
@Override
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
try {
int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
int cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
if (cmp != 0) {
return cmp;
}
return TEXT_COMPARATOR.compare(b1, s1 + firstL1, l1 - firstL1,
b2, s2 + firstL2, l2 - firstL2);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
}
static {
WritableComparator.define(TextPair.class, new Comparator());
}
// ^^ TextPairComparator
// vv TextPairFirstComparator
public static class FirstComparator extends WritableComparator {
private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
public FirstComparator() {
super(TextPair.class);
}
@Override
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
try {
int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
return TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
if (a instanceof TextPair && b instanceof TextPair) {
return ((TextPair) a).first.compareTo(((TextPair) b).first);
}
return super.compare(a, b);
}
}
// ^^ TextPairFirstComparator
// vv TextPair
}
// ^^ TextPair
There are two kinds of comparators
defined:
one is sorting by first
followed by second
which is the default comparator.
The other is sorting by first
ONLY, which is the firstComparator.
If I have to use use firstComparator for sorting my keys, how do I achieve that?
That is, how do I override my default comparator with the first comparator
, I defined above.
Secondly, how would I unitTest
this since the output of map
job is not sorted
. ?
Upvotes: 0
Views: 3900
Reputation: 10093
If I have to use use firstComparator for sorting my keys, how do I achieve that? That is, how do I override my default comparator with the first comparator, I defined above.
I assume you expect a method something like setComparator(firstComparator). As far as I know there is no such method. The keys are sorted (on the mapper side) using the compareTo()
of the Writeable
type representing the keys. In your case, the compareTo()
method checks the first value and then the second one. In other words, the keys will be sorted by the first value and, then, the keys in the same group (i.e. having the same first value) will be sorted by their second value.
All in all, this means that your keys will always be sorted by the first value (+ by the second value if the first one isn't able to take the decision). Which in turn means that there is no need to have a different comparator (firstComparator
) which looks only at the first value because that is already achieved with the compareTo()
method of your TextPair
class.
On the other hand, if the firstComparator
sorts the keys completely differently, the only solution is to move the logic in firstComparator
to the compareTo()
method of the Writable
class representing your key. I don't see any reason why you wouldn't do that. If you already have the firstComparator
and want to reuse it, you can instantiate it and invoke it in the compareTo()
method of the TexPair
Writable
.
You might also want to take a look at the GroupingComparator
which is used to decide which keys are used together in the same call of the reduce()
method. Since you didn't describe exactly what you want to achieve, I can't say for sure if this will be helpful or not.
Secondly, how would I unitTest this since the output of map job is not sorted. ?
Unit testing, as the name says, implies testing a single unit of code (most of the time a method/function/procedure). If you want to unit-test your reduce method you have to provide the interesting input cases and to check that the method under test outputs the expected result. More concretely, you have to create/mock a sorted Iterable over your keys and invoke your reduce function with it. Unit testing a reduce method shouldn't rely on the execution of the corresponding map method.
Upvotes: 2