Sachin
Sachin

Reputation: 1715

How to override the default sorting of Hadoop

I have a map-reduce job in which the keys are numbers from 1-200. My intended output was (number,value) in the number order. But I'm getting the output as :

1    value
10   value
11   value
   :
   : 
2    value
20   value
   :
   :
3    value

I know this is due to the default behavior of Map-Reduce to sort keys in ascending order.

I want my keys to be sorted in numerical order only. How can I achieve this?

Upvotes: 0

Views: 1339

Answers (2)

1218985
1218985

Reputation: 8032

The default WritableComparator in MapReduce framework would normally handle your numerical ordering if the key was IntWritable. I suspect it's getting a Text key thus resulting in lexicographical ordering in your case. Please have a look at the sample code which uses IntWritable key to emit the values:

1) Mapper Implementaion

package com.stackoverflow.answers.mapreduce;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SourceFileMapper extends Mapper<LongWritable, Text, IntWritable, Text> {

    private static final String DEFAULT_DELIMITER = "\t";

    private IntWritable keyToEmit = new IntWritable();
    private Text valueToEmit = new Text();

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        keyToEmit.set(Integer.parseInt(line.split(DEFAULT_DELIMITER)[0]));
        valueToEmit.set(line.split(DEFAULT_DELIMITER)[1]);
        context.write(keyToEmit, valueToEmit);
    }

}

2) Reducer Implementation

package com.stackoverflow.answers.mapreduce;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SourceFileReducer extends Reducer<IntWritable, Text, IntWritable, Text> {

    public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException,
            InterruptedException {
        for (Text value : values) {
            context.write(key, value);
        }
    }

}

3) Driver Implementation

package com.stackoverflow.answers.mapreduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class SourceFileDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

        Path inputPath = new Path(args[0]);
        Path outputDir = new Path(args[1]);

        // Create configuration
        Configuration conf = new Configuration(true);

        // Create job
        Job job = new Job(conf, "SourceFileDriver");
        job.setJarByClass(SourceFileDriver.class);

        // Setup MapReduce
        job.setMapperClass(SourceFileMapper.class);
        job.setReducerClass(SourceFileReducer.class);
        job.setNumReduceTasks(1);

        // Specify key / value
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(Text.class);

        // Input
        FileInputFormat.addInputPath(job, inputPath);
        job.setInputFormatClass(TextInputFormat.class);

        // Output
        FileOutputFormat.setOutputPath(job, outputDir);
        job.setOutputFormatClass(TextOutputFormat.class);

        // Delete output if exists
        FileSystem hdfs = FileSystem.get(conf);
        if (hdfs.exists(outputDir))
            hdfs.delete(outputDir, true);

        // Execute job
        int code = job.waitForCompletion(true) ? 0 : 1;
        System.exit(code);

    }

}

Thank you!

Upvotes: 1

Donald Miner
Donald Miner

Reputation: 39913

If I had to take a guess, I'd say that you are storing your numbers as Text objects and not IntWritable objects.

Either way, once you have more than one reducer, only the items within a reducer will be sorted, but it won't be totally sorted.

Upvotes: 3

Related Questions