cloud_anny
cloud_anny

Reputation: 37

Partitioner does not seems to work on single node?

I have written map reduce code along with custom partition.Custom partition sort the key with some condition.I set a setNumReduceTasks=6 in driver class. But i am testing this piece of code on my single machine.I get only one reducer output file and not 6 reducers files. does partitioner not works on single machine?is there need of multi node cluster to see a the effect of custom partitioner? any insight on this will be appreciated.

Upvotes: 1

Views: 189

Answers (3)

Marco99
Marco99

Reputation: 1659

Take a good look into your custom partitioner. It may return the same partition value for all the keys passed into it.

In such case, it is a inefficient partitioner that, it sends all keys to the same reducer. So, even if you set the number of reducers to be 6, only one reducer will have all the key-values and the remaining 5 reducers will not have anything to process.

So, you will have the output for the only reducer that processed all the records.

does partitioner not works on single machine? Partitioner would also work in single machine pseudo cluster.

is there need of multi node cluster to see a the effect of custom partitioner? No.

Upvotes: 0

ganesh_patil
ganesh_patil

Reputation: 366

Partitioner always work when you set the no of reducer greater than one, even if its a single node cluster .

I have tested below code on single node cluster and it works as expected :

public final class SortMapReduce extends Configured implements Tool {

public static void main(final String[] args) throws Exception {
    int res = ToolRunner.run(new Configuration(), new SortMapReduce(), args);
    System.exit(res);
}

public int run(final String[] args) throws Exception {

    Path inputPath = new Path(args[0]);
    Path outputPath = new Path(args[1]);

    Configuration conf = super.getConf();

    Job job = Job.getInstance(conf);

    job.setJarByClass(SortMapReduce.class);
    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);

    job.setInputFormatClass(KeyValueTextInputFormat.class);

    job.setMapOutputKeyClass(Person.class);
    job.setMapOutputValueClass(Text.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    job.setPartitionerClass(PersonNamePartitioner.class);

    job.setNumReduceTasks(5);

    FileInputFormat.setInputPaths(job, inputPath);
    FileOutputFormat.setOutputPath(job, outputPath);

    if (job.waitForCompletion(true)) {
        return 0;
    }
    return 1;
}

public static class Map extends Mapper<Text, Text, Person, Text> {

    private Person outputKey = new Person();

    @Override
    protected void map(Text pointID, Text firstName, Context context) throws IOException, InterruptedException {
        outputKey.set(pointID.toString(), firstName.toString());
        context.write(outputKey, firstName);
    }
}

public static class Reduce extends Reducer<Person, Text, Text, Text> {

    Text pointID = new Text();

    @Override
    public void reduce(Person key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        pointID.set(key.getpointID());
        for (Text firstName : values) {
            context.write(pointID, firstName);
        }
    }
}

}

Partitioner class :

public class PersonNamePartitioner extends Partitioner<Person, Text> {

@Override
public int getPartition(Person key, Text value, int numPartitions) {

    return Math.abs(key.getpointID().hashCode() * 127) % numPartitions;
}

}

Run command :

hadoop jar /home/hdfs/SecondarySort.jar org.test.SortMapReduce /demo/data/Customer/acct.txt /demo/data/Customer/output2

Thanks,

Upvotes: 1

gsamaras
gsamaras

Reputation: 73366

I had a two-noded cluster in a single machine. Here is exactly what I did. From there you can see that I did this (on execution):

To specify the number of reducers, for example two

-D mapred.reduce.tasks=2

Upvotes: 0

Related Questions