USB
USB

Reputation: 6139

Calculating 'n' maximum value in hadoop

I have a scenario. Output from previous job1

In next job I need to find i key having maximum value.eg i=3, 3 keys having maximum value. (i will be a custom parameter)

How to approach this.

Should we calculated max in job2 mapper as there will be unique keys, as the output is coming from previous reducer or find max in second jobs reducer.But again how to find i keys?

Update

I tried in this way Instead of emiting value as value in reducer.I emitted value as key so I can get the values in ascending order. And I wrote the next MR job.where mapper simply emits the key/value.

Reducer finds the max of key But again I am stuck that cannot be done as we try to get the id , because id is only unique,Values are not uniqe.

How to solve this.

Can anyone suggest a solution for this.

Thanks in advance.

Upvotes: 1

Views: 738

Answers (1)

Aleksei Shestakov
Aleksei Shestakov

Reputation: 2538

You can find top i keys with PriorityQueue. Simple code to illustrate the idea:

public static class TopNMapper extends Mapper<IntWritable, DoubleWritable, IntWritable, DoubleWritable> {
    private static class MyPair implements Comparable<MyPair> {
        public final int key;
        public final double value;

        MyPair(int key, double value) {
            this.key = key;
            this.value = value;
        }

        @Override
        public int compareTo(MyPair o) {
            return -Double.compare(value, o.value); // i'm not sure about '-'
        }
    }

    private final PriorityQueue<MyPair> topN = new PriorityQueue<MyPair>();

    @Override
    protected void map(IntWritable key, DoubleWritable value, Context context) throws IOException, InterruptedException {
        if (Double.isNaN(value.get())) {
            return; // not a number
        }
        topN.add(new MyPair(key.get(), value.get()));
        if (topN.size() <= 50) { // simple optimization
            return;
        }
        while (topN.size() > 3) { // retain only top 3 elements in queue
            topN.poll();
        }
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        while (topN.size() > 3) {
            topN.poll(); // retain only top 3 elements in queue
        }
        for (MyPair myPair : topN) { // write top 3 elements
            context.write(new IntWritable(myPair.key), new DoubleWritable(myPair.value));
        }
    }
}

If you run this mapper (one for all input), you should get 3 keys with maximum values​​.

Upvotes: 1

Related Questions