Krle
Krle

Reputation: 70

Hadoop: Secondary sort does not work

I have implemented an algorithm in Hadoop 1.2.1, where reducer code relies on the secondary sorting. However, when I run the algorithm one reducer receives sorted tuples, but the other does not. I've spent a lot of time trying to figure out why, but without any success.

Does anyone know what might be the problem? I assume it has to do with the secondary sort code.

Here is the code that implements the secondary sorting:

Composite key

    public class CompositeKey implements WritableComparable<CompositeKey>{
        public String key;
        public Integer position;
        @Override
        public void readFields(DataInput arg0) throws IOException {
            key = WritableUtils.readString(arg0);
            position = arg0.readInt();
        }
        @Override
        public void write(DataOutput arg0) throws IOException {
            WritableUtils.writeString(arg0, key);
            arg0.writeLong(position);
        }
        @Override
        public int compareTo(CompositeKey o) {
            int result = key.compareTo(o.key);
            if(0 == result) {
                result = position.compareTo(o.position);
            }
            return result;
        }
    }

KeyComparator

    public class CompositeKeyComparator extends WritableComparator {
         protected CompositeKeyComparator() {
                super(CompositeKey.class, true);
            }   
            @SuppressWarnings("rawtypes")
            @Override
            public int compare(WritableComparable w1, WritableComparable w2) {
                CompositeKey k1 = (CompositeKey)w1;
                CompositeKey k2 = (CompositeKey)w2;

                int result = k1.key.compareTo(k2.key);
                if(0 == result) {
                    result = -1* k1.position.compareTo(k2.position);
                }
                return result;
            }

    }

Grouping Comparator

    public class NaturalKeyGroupingComparator extends WritableComparator {
        protected NaturalKeyGroupingComparator() {
            super(CompositeKey.class, true);
        }   
        @SuppressWarnings("rawtypes")
        @Override
        public int compare(WritableComparable w1, WritableComparable w2) {
            CompositeKey k1 = (CompositeKey)w1;
            CompositeKey k2 = (CompositeKey)w2;

            return k1.key.compareTo(k2.key);
        }

    }

Partitioner

    public class NaturalKeyPartitioner extends Partitioner<CompositeKey, ReduceValue> {
        @Override
        public int getPartition(CompositeKey key, ReduceValue val, int numPartitions) {
            int hash = key.key.hashCode();
            int partition = hash & Integer.MAX_VALUE % numPartitions;
            return partition;
        }

Job configuration

    //secondary sort
    job.setPartitionerClass(NaturalKeyPartitioner.class);
    job.setGroupingComparatorClass(NaturalKeyGroupingComparator.class);
    job.setSortComparatorClass(CompositeKeyComparator.class);

If I execute this both on the pseudo distributed environment or on the cluster I notice that one reducer gets sorted tuples, while another does not. For example here is a excerpt showing tuples received by two reducers (First column is the primary ket, and the second is the secondary):

    First reducer:
    a1 0 
    a1 1 
    a1 11 
    a1 16 
    a1 27 
    a1 28 
    a1 34 
    a1 35 
    a1 37 
    a1 38 
    a1 43 
    a1 44 
    a1 46 
    a1 48 
    a1 50 
    a1 54 
    a1 55 
    a1 56 
    a1 57 
    a1 60 
    a1 61 
    a1 63 
    a1 64 
    a1 66 
    a1 69 
    a1 70 
    a1 72 
    a1 75 
    a1 76 
    a1 78 
    a1 79 
    a1 80 
    a1 84 
    a1 85 
    a1 86 
    a1 87 
    a1 88 
    a1 91 
    a1 92 
    a1 97 
    a1 102   
    a1 106    
    a1 108  
    a1 109 
    a1 110 
    a1 111 
    a1 116     
    a1 118  
    a1 119 
    a1 120  

    Second reducer:
    a2 87 
    a2 115
    a2 65 
    a2 90 
    a2 68 
    a2 119    
    a2 91 
    a2 0 
    a2 70 
    a2 3 
    a2 8 
    a2 9 
    a2 10 
    a2 71 
    a2 110   
    a2 16 
    a2 17 
    a2 20 
    a2 21 
    a2 23 
    a2 26 
    a2 72 
    a2 27 
    a2 94 
    a2 29 
    a2 30 
    a2 31 
    a2 75 
    a2 95 
    a2 36 
    a2 76 
    a2 117  
    a2 39 
    a2 40 
    a2 41 
    a2 42 
    a2 97 
    a2 79 
    a2 44 
    a2 45 
    a2 98 
    a2 46 
    a2 80 
    a2 49 
    a2 82 
    a2 50 
    a2 83 
    a2 100 
    a2 84 
    a2 112     
    a2 57 
    a2 59 
    a2 113      
    a2 60 
    a2 114       
    a2 61 

Upvotes: 0

Views: 711

Answers (1)

Chris Gerken
Chris Gerken

Reputation: 16392

I think it's because in your serialize/de-serialize logic for CompositeKey you write the position as a long but read it as an integer. That will mess up the comparison logic because you're not testing exactly the same thing you wrote to the context.

Upvotes: 1

Related Questions