vinu.m.19
vinu.m.19

Reputation: 505

Hadoop Map Reduce , How to combine first reducer output and first map input , as input for second mapper?

I need to implement a functionality using map reduce.

Requirement is mentioned below.

  1. Input for the mapper is a file containing two columns productId , Salescount
  2. Reducers output , sum of salescount

Requirement is I need to calculate salescount / sum(salescount).

For this I am planing to use nested map reduce. But for the second mapper I need to use first reducers output and first map's input.

How Can I implement this. Or is there any alternate way ?

Regards Vinu

Upvotes: 3

Views: 3309

Answers (2)

naveenkumarbv
naveenkumarbv

Reputation: 117

With the usecase in hand, I believe we don't need two different mappers/mapreduce jobs to achieve this. (As an extension to the answer given in above comments)

Lets assume you have a very large input file split into multiple blocks in HDFS. When you trigger a MapReduce job with this file as input, multiple mappers(equal to the number of input blocks) will start execution in parallel.

In your mapper implementation, read each line from input and write the productId as key and the saleCount as value to context. This data is passed to the Reducer.

We know that, in a MR job all the data with the same key is passed to the same reducer. Now, in your reducer implementation you can calculate the sum of all saleCounts for a particular productId.

Note: I'm not sure about the value 'salescount' in your numerator.

Assuming that its the count of number of occurrences of a particular product, please use a counter to add and get the total sales count in the same for loop where you are calculating the SUM(saleCount). So, we have

totalCount -> Count of number of occurrences of a product sumSaleCount -> Sum of saleCount value for each product.

Now, you can directly divide the above values: totalCount/sumSaleCount.

Hope this helps! Please let me know if you have a different use case in mind.

Upvotes: 0

shazin
shazin

Reputation: 21923

You can use ChainMapper and ChainReducer to PIPE Mappers and Reducers the way you want. Please have a look at here

The following will be similar to the code snippet you would need to implement

JobConf mapBConf = new JobConf(false);

JobConf reduceConf = new JobConf(false);

ChainMapper.addMapper(conf, FirstMapper.class, FirstMapperInputKey.class, FirstMapperInputValue.class,
   FirstMapperOutputKey.class, FirstMapperOutputValue.class, false, mapBConf);

ChainReducer.setReducer(conf, FirstReducer.class, FirstMapperOutputKey.class, FirstMapperOutputValue.class,
   FirstReducerOutputKey.class, FirstReducerOutputValue.class, true, reduceConf);

ChainReducer.addMapper(conf, SecondMapper.class, FirstReducerOutputKey.class, FirstReducerOutputValue.class,
   SecondMapperOutputKey.class, SecondMapperOutputValue.class, false, null);

ChainReducer.setReducer(conf, SecondReducer.class, SecondMapperOutputKey.class, SecondMapperOutputValue.class, SecondReducerOutputKey.class, SecondReducerOutputValue.class, true, reduceConf);

or if you don't want to use multiple Mappers and Reducers you can do the following

public static class ProductIndexerMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, LongWritable> {

    private static Text productId = new Text();
    private static LongWritable salesCount = new LongWritable();

    @Override
    public void map(LongWritable key, Text value,
            OutputCollector<Text, LongWritable> output, Reporter reporter)
            throws IOException {
        String[] values = value.toString().split("\t");
        productId.set(values[0]);           
        salesCount.set(Long.parseLong(values[1]));
        output.collect(productId, salesCount);
    }

}

public static class ProductIndexerReducer extends MapReduceBase implements Reducer<Text, LongWritable, Text, LongWritable> {

    private static LongWritable productWritable = new LongWritable();

    @Override
    public void reduce(Text key, Iterator<LongWritable> values,
            OutputCollector<Text, LongWritable> output, Reporter reporter)
            throws IOException {
        List<LongWritable> items = new ArrayList<LongWritable>(); 
        long total = 0;
        LongWritable item = null;
        while(values.hasNext()) {
            item = values.next();
            total += item.get();
            items.add(item);
        }
        Iterator<LongWritable> newValues = items.iterator();
        while(newValues.hasNext()) {
            productWritable.set(newValues.next().get()/total);
            output.collect(key, productWritable);
        }
    }

}

`

Upvotes: 3

Related Questions