brain storm
brain storm

Reputation: 31252

how to get the keys sorted by custom comparator in map-reduce job in Hadoop?

Consider this class: (From Hadoop: The definitive guide 3rd edition):

import java.io.*;
import org.apache.hadoop.io.*;

public class TextPair implements WritableComparable<TextPair> {

  private Text first;
  private Text second;

  public TextPair() {
    set(new Text(), new Text());
  }

  public TextPair(String first, String second) {
    set(new Text(first), new Text(second));
  }

  public TextPair(Text first, Text second) {
    set(first, second);
  }

  public void set(Text first, Text second) {
    this.first = first;
    this.second = second;
  }

  public Text getFirst() {
    return first;
  }

  public Text getSecond() {
    return second;
  }

  @Override
  public void write(DataOutput out) throws IOException {
    first.write(out);
    second.write(out);
  }

  @Override
  public void readFields(DataInput in) throws IOException {
    first.readFields(in);
    second.readFields(in);
  }

  @Override
  public int hashCode() {
    return first.hashCode() * 163 + second.hashCode();
  }

  @Override
  public boolean equals(Object o) {
    if (o instanceof TextPair) {
      TextPair tp = (TextPair) o;
      return first.equals(tp.first) && second.equals(tp.second);
    }
    return false;
  }

  @Override
  public String toString() {
    return first + "\t" + second;
  }

  @Override
  public int compareTo(TextPair tp) {
    int cmp = first.compareTo(tp.first);
    if (cmp != 0) {
      return cmp;
    }
    return second.compareTo(tp.second);
  }
  // ^^ TextPair

  // vv TextPairComparator
  public static class Comparator extends WritableComparator {

    private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();

    public Comparator() {
      super(TextPair.class);
    }

    @Override
    public int compare(byte[] b1, int s1, int l1,
                       byte[] b2, int s2, int l2) {

      try {
        int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
        int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
        int cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
        if (cmp != 0) {
          return cmp;
        }
        return TEXT_COMPARATOR.compare(b1, s1 + firstL1, l1 - firstL1,
                                       b2, s2 + firstL2, l2 - firstL2);
      } catch (IOException e) {
        throw new IllegalArgumentException(e);
      }
    }
  }

  static {
    WritableComparator.define(TextPair.class, new Comparator());
  }
  // ^^ TextPairComparator

  // vv TextPairFirstComparator
  public static class FirstComparator extends WritableComparator {

    private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();

    public FirstComparator() {
      super(TextPair.class);
    }

    @Override
    public int compare(byte[] b1, int s1, int l1,
                       byte[] b2, int s2, int l2) {

      try {
        int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
        int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
        return TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
      } catch (IOException e) {
        throw new IllegalArgumentException(e);
      }
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
      if (a instanceof TextPair && b instanceof TextPair) {
        return ((TextPair) a).first.compareTo(((TextPair) b).first);
      }
      return super.compare(a, b);
    }
  }
  // ^^ TextPairFirstComparator

// vv TextPair
}
// ^^ TextPair

There are two kinds of comparators defined: one is sorting by first followed by second which is the default comparator. The other is sorting by first ONLY, which is the firstComparator.

If I have to use use firstComparator for sorting my keys, how do I achieve that? That is, how do I override my default comparator with the first comparator, I defined above.

Secondly, how would I unitTest this since the output of map job is not sorted. ?

Upvotes: 0

Views: 3900

Answers (1)

Razvan
Razvan

Reputation: 10093

If I have to use use firstComparator for sorting my keys, how do I achieve that? That is, how do I override my default comparator with the first comparator, I defined above.

I assume you expect a method something like setComparator(firstComparator). As far as I know there is no such method. The keys are sorted (on the mapper side) using the compareTo() of the Writeable type representing the keys. In your case, the compareTo() method checks the first value and then the second one. In other words, the keys will be sorted by the first value and, then, the keys in the same group (i.e. having the same first value) will be sorted by their second value.

All in all, this means that your keys will always be sorted by the first value (+ by the second value if the first one isn't able to take the decision). Which in turn means that there is no need to have a different comparator (firstComparator) which looks only at the first value because that is already achieved with the compareTo() method of your TextPair class.

On the other hand, if the firstComparator sorts the keys completely differently, the only solution is to move the logic in firstComparator to the compareTo() method of the Writable class representing your key. I don't see any reason why you wouldn't do that. If you already have the firstComparator and want to reuse it, you can instantiate it and invoke it in the compareTo() method of the TexPair Writable.

You might also want to take a look at the GroupingComparator which is used to decide which keys are used together in the same call of the reduce() method. Since you didn't describe exactly what you want to achieve, I can't say for sure if this will be helpful or not.

Secondly, how would I unitTest this since the output of map job is not sorted. ?

Unit testing, as the name says, implies testing a single unit of code (most of the time a method/function/procedure). If you want to unit-test your reduce method you have to provide the interesting input cases and to check that the method under test outputs the expected result. More concretely, you have to create/mock a sorted Iterable over your keys and invoke your reduce function with it. Unit testing a reduce method shouldn't rely on the execution of the corresponding map method.

Upvotes: 2

Related Questions