Vignesh I
Vignesh I

Reputation: 2221

Does default sorting in mapreduce uses Comparator defined in WritableComparable class or the comapreTo() method?

How does sort happens in mapreduce before the output is passed from mapper to reducer. If my output key from mapper is of type IntWritable, does it uses the comparator defined in IntWritable class or compareTo method in the class, if yes how the call is made. If not how the sort is performed, how the call is made?

Upvotes: 0

Views: 2343

Answers (2)

Matt Fortier
Matt Fortier

Reputation: 1223

Map job outputs are first collected and then sent to the Partitioner, responsible for determining to which Reducer the data will be sent (it's not yet grouped by reduce() call though). The default Partitioner uses the hashCode() method of the Key and a modulo with the number of Reducers to do that.

After that, the Comparator will be called to perform a sort on the Map outputs. Flow looks like that:

Collector --> Partitioner --> Spill --> Comparator --> Local Disk (HDFS) <-- MapOutputServlet

Each Reducer will then copy the data from the mapper that has been assigned to it by the partitioner, and pass it through a Grouper that will determine how records are grouped for a single Reducer function call:

MapOutputServlet --> Copy to Local Disk (HDFS) --> Group --> Reduce

Before a function call, the records will also go through a Sorting phase to determine in which order they arrive to the reducer. The Sorter (WritableComparator()) will call the compareTo() (WritableComparable() interface) method of the Key.

To give you a better idea, here is how you would implement a basic compareTo(), grouper and sorter for a custom composite key:

public class CompositeKey implements WritableComparable<CompositeKey> {
    IntWritable primaryField = new IntWritable();
    IntWritable secondaryField = new IntWritable();

    public CompositeKey(IntWritable p, IntWritable s) {
        this.primaryField.set(p);
        this.secondaryField = s;
    }

    public void write(DataOutput out) throws IOException {
        this.primaryField.write(out);
        this.secondaryField.write(out);
    }

    public void readFields(DataInput in) throws IOException {
        this.primaryField.readFields(in);
        this.secondaryField.readFields(in);
    }

    // Called by the partitionner to group map outputs to same reducer instance
    // If the hash source is simple (primary type or so), a simple call to their hashCode() method is good enough
    public int hashCode() {
        return this.primaryField.hashCode();
    }

    @Override
    public int compareTo(CompositeKey other) {
        if (this.getPrimaryField().equals(other.getPrimaryField())) {
            return this.getSecondaryField().compareTo(other.getSecondaryField());
        } else {
            return this.getPrimaryField().compareTo(other.getPrimaryField());
        }
    }
}


public class CompositeGroupingComparator extends WritableComparator {
    public CompositeGroupingComparator() {
        super(CompositeKey.class, true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        CompositeKey first = (CompositeKey) a;
        CompositeKey second = (CompositeKey) b;

        return first.getPrimaryField().compareTo(second.getPrimaryField());
    }
}

public class CompositeSortingComparator extends WritableComparator {
    public CompositeSortingComparator() {
        super (CompositeKey.class, true);
    }

    @Override
    public int compare (WritableComparable a, WritableComparable b){
        CompositeKey first = (CompositeKey) a;
        CompositeKey second = (CompositeKey) b;

        return first.compareTo(second);
    }
}

Upvotes: 1

Karthik
Karthik

Reputation: 1811

After Mapper framework takes care about comparing for us for all the default datatypes like IntWritable, DoubleWritable e.t.c ... But if you have a user defined keytype you need to implement WritableComparable Interface.

WritableComparables can be compared to each other, typically via Comparators. Any type which is to be used as a key in the Hadoop Map-Reduce framework should implement this interface.

Note that hashCode() is frequently used in Hadoop to partition keys. It's important that your implementation of hashCode() returns the same result across different instances of the JVM. Note also that the default hashCode() implementation in Object does not satisfy this property.

Example:

public class MyWritableComparable implements WritableComparable {
   // Some data
   private int counter;
   private long timestamp;

   public void write(DataOutput out) throws IOException {
     out.writeInt(counter);
     out.writeLong(timestamp);
   }

   public void readFields(DataInput in) throws IOException {
     counter = in.readInt();
     timestamp = in.readLong();
   }

   public int compareTo(MyWritableComparable o) {
     int thisValue = this.value;
     int thatValue = o.value;
     return (thisValue < thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
   }

   public int hashCode() {
     final int prime = 31;
     int result = 1;
     result = prime * result + counter;
     result = prime * result + (int) (timestamp ^ (timestamp >>> 32));
     return result
   }
 }

From :https://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/WritableComparable.html

Upvotes: 0

Related Questions