user1585111
user1585111

Reputation: 1019

map reduce chaining not executed properly

hi i am finding bit problem with map reduce chaining.i have to form a chain like this

mapper->reducer->mapper

from my first mapper to reducer the flow has been good,and the output data of this reducer is not going to next mapper properly.this is a simple sample of code what i have tried

This is my first mapper

public void map(LongWritable key, Text value,
        OutputCollector<Text, IntWritable> outputCollector, Reporter reporter)
        throws IOException {

    String maxSalary = value.toString().split(",")[4]; 

    outputCollector.collect(new Text("max salary"),new IntWritable(Integer.parseInt(maxSalary)));

}

This is my reducer

public void reduce(Text key, Iterator<IntWritable> values,
            OutputCollector<Text, IntWritable> outputCollector, Reporter reporter)
            throws IOException {
        int maxSalary = Integer.MIN_VALUE;

        while(values.hasNext()){

            maxSalary = Math.max(maxSalary, values.next().get());
        }
        outputCollector.collect(key, new IntWritable(maxSalary));

    }

This is my next simple Mapper

public void map(Text key, IntWritable value,
            OutputCollector<Text, IntWritable> outputCollector, Reporter reporter)
            throws IOException {

        System.out.println(value.toString());
    }

This is my main class which runs the job

JobConf jobConf = new JobConf(jobrunner.class);
jobConf.setJobName("Chaining");

FileInputFormat.setInputPaths(jobConf, new Path("hdfs://localhost:9000/employee_data.txt"));
FileOutputFormat.setOutputPath(jobConf,new Path("hdfs://localhost:9000/chain9.txt"));

JobConf conf1 = new JobConf(false);

ChainMapper.addMapper(jobConf,chainmap.class,LongWritable.class,Text.class,Text.class,IntWritable.class,true,conf1);

JobConf conf2 = new JobConf(false);

ChainReducer.setReducer(jobConf, chainreduce.class,Text.class,IntWritable.class,Text.class,IntWritable.class,true,conf2);

JobConf conf3 = new JobConf(false);

ChainMapper.addMapper(jobConf, nextchainmap.class, Text.class,IntWritable.class,Text.class,IntWritable.class,true,conf3);


JobClient.runJob(jobConf);

I will get max employee salary in my reducer and this has to be passed to next mapper where it would find the employee records with max salary value,how can i accomplish this in the next mapper?any ideas?

Upvotes: 1

Views: 424

Answers (1)

Jean Logeart
Jean Logeart

Reputation: 53829

To chain you second mapper, you need to call ChainReducer.addMapper(...) instead of ChainMapper.addMapper(...).

Upvotes: 2

Related Questions