Kevin.Lam
Kevin.Lam

Reputation: 299

Hadoop Combiner prior to secondary sorting

Situation

I am trying to output an inverted index with terms as the key and document number:frequency as the value. The list of values are sorted by frequency in descending order. Ideally, I'd like to do this with only 1 Mapreduce phase/job.

term1 -> (doc3, 2) (doc1, 1) (doc5, 1) 
term2 -> (doc2, 3) (doc3, 2) (doc6, 1)

What I've Tried

The way my program works now is that I create a composite key (term, docNum=count, freq=1) and I create a natural value (docNum=count, freq=1). I pass these key,value pair from MAP. During combine, I sum up the frequencies and pass the sum as new frequency values for composite key and natural value. Finally, in the reduce phase, I output the key and list of values.

Problem

For my composite key, I have the comparator set up so that its sorting the frequency in descending order. However, I initially pass in the term frequency as 1 (so that I can sum them up in combine phase). It seems that the secondary sort comparison happens BEFORE combine. Instead of comparing frequency values AFTER I sum up the frequencies, the comparison occurs before the summation. So in the example above, term2 -> doc2's frequency of 1 would be compared against term2 -> doc2's frequency of 1 instead of term2 -> doc2's frequency of 3 being compared against term2 ->doc3's frequency of 2.

I am not sure how to get the frequencies to be sorted in descending order.

CompositeKey.java (compareTo)

@Override
public int compareTo(TermCompositeKey termCompositeKey) {
  int result = this.term.compareTo(termCompositeKey.getTerm());
  if (result == 0) {
    this.tf.compareTo(termCompositeKey.getTf());

  }
  return result;
} 

CombinerComparator.java

public class TermCombinerComparator extends WritableComparator {
protected TermCombinerComparator() {
  super(TermCompositeKey.class, true);
}

@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable wc1, WritableComparable wc2) {
  int result = 0;
  TermCompositeKey termCompositeKey1 = (TermCompositeKey) wc1;
  TermCompositeKey termCompositeKey2 = (TermCompositeKey) wc2;
  result = termCompositeKey1.getTerm().compareTo(termCompositeKey2.getTerm());
  if (result == 0) {
    result= (int)(termCompositeKey1.getDocPosition() - termCompositeKey2.getDocPosition());
  }
  return result;
}

GroupingComparator.java

public class TermGroupingComparator extends WritableComparator {
protected TermGroupingComparator() {
  super(TermCompositeKey.class, true);
}

@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable wc1, WritableComparable wc2) {
  TermCompositeKey termCompositeKey1 = (TermCompositeKey) wc1;
  TermCompositeKey termCompositeKey2 = (TermCompositeKey) wc2;

  return termCompositeKey1.getTerm().compareTo(termCompositeKey2.getTerm());
}

Upvotes: 1

Views: 288

Answers (0)

Related Questions