Suresh J
Suresh J

Reputation: 31

Reducer not picking mapper output file

I have 4 files in a folder and the folder location is my input path argument. I need to find word count of each file individually and should write to the file with same name as input file.

I have written mapper class which gives the output correctly to specified file. But, that is not being processed by reducer. What i did wrong is - I didnt use 'context' while writing output of mapper, so empty is passed to reducer and blank output produced. But, mapper executed as desired and kept the files at the correct location with expected file names. I want shuffle and sort & reducer to work on these files / those files to passed to reducer. Please correct me. Thanks.

Mapper

package com.oracle.hadoop.multiwordcount;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class MultiWordCountMapper extends
    Mapper<LongWritable, Text, Text, LongWritable> {

protected String filenamekey;
private RecordWriter<Text, LongWritable> writer;

protected void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {

    // Read the line
    String line = value.toString();

    // Split the line into words
    String[] words = line.split(" ");

    // Assign count(1) to each word
    for (String word : words) {
        writer.write(new Text(word), new LongWritable(1));
    }

}

protected void setup(Context context) throws IOException,
        InterruptedException {
    InputSplit split = context.getInputSplit();
    Path path = ((FileSplit) split).getPath();

    // extract parent folder and filename
    filenamekey = path.getParent().getName() + "/" + path.getName();

    // base output folder
    final Path baseOutputPath = FileOutputFormat.getOutputPath(context);
    // output file name
    final Path outputFilePath = new Path(baseOutputPath, filenamekey);

    // We need to override the getDefaultWorkFile path to stop the file
    // being created in the _temporary/taskid folder
    TextOutputFormat<Text, LongWritable> tof = new TextOutputFormat<Text, LongWritable>() {
        @Override
        public Path getDefaultWorkFile(TaskAttemptContext context,
                String extension) throws IOException {
            return outputFilePath;

        }

    };
    // create a record writer that will write to the desired output
    // subfolder
    writer = tof.getRecordWriter(context);

}

protected void cleanup(Context context) throws IOException,
        InterruptedException {
    writer.close(context);
};
}

Reducer

package com.oracle.hadoop.multiwordcount;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class MultiWordCountReducer extends
        Reducer<Text, LongWritable, Text, LongWritable> {

/*
 * private MultipleOutputs multiouputs;
 * 
 * protected void setup(Context context) throws java.io.IOException
 * ,InterruptedException { multiouputs = new MultipleOutputs(context);
 * 
 * }
 */
@Override
protected void reduce(Text key, Iterable<LongWritable> values,
        Context context) throws java.io.IOException, InterruptedException {
    // Sum the List of values
    long sum = 0;
    for (LongWritable value : values) {
        sum = sum + value.get();
    }

    // Assign Sum to corresponding Word
    context.write(key, new LongWritable(sum));

}
/*
 * protected void cleanup(Context context) throws java.io.IOException
 * ,InterruptedException { multiouputs.close(); };
 */

}

Driver

package com.oracle.hadoop.multiwordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MultiWordCountJob implements Tool {
    private Configuration conf;
@Override
public Configuration getConf() {
    return conf;
}

@Override
public void setConf(Configuration conf) {
    this.conf = conf;
}

@Override
public int run(String[] args) throws Exception {
    @SuppressWarnings("deprecation")
    Job mwcj = new Job(getConf());

    // setting the job name
    mwcj.setJobName("Multiple file WordCount Job");

    // to call this as a jar
    mwcj.setJarByClass(this.getClass());

    // setting custom mapper class
    mwcj.setMapperClass(MultiWordCountMapper.class);

    // setting custom reducer class
    mwcj.setReducerClass(MultiWordCountReducer.class);

    // setting no of reducers
    // mwcj.setNumReduceTasks(0);

    // setting custom partitioner class
    // mwcj.setPartitionerClass(WordCountPartitioner.class);

    // setting mapper output key class: K2
    mwcj.setMapOutputKeyClass(Text.class);

    // setting mapper output value class: V2
    mwcj.setMapOutputValueClass(LongWritable.class);

    // setting reducer output key class: K3
    mwcj.setOutputKeyClass(Text.class);

    // setting reducer output value class: V3
    mwcj.setOutputValueClass(LongWritable.class);

    // setting the input format class ,i.e for K1, V1
    mwcj.setInputFormatClass(TextInputFormat.class);

    // setting the output format class
    LazyOutputFormat.setOutputFormatClass(mwcj, TextOutputFormat.class);
    // mwcj.setOutputFormatClass(TextOutputFormat.class);

    // setting the input file path
    FileInputFormat.addInputPath(mwcj, new Path(args[0]));

    // setting the output folder path
    FileOutputFormat.setOutputPath(mwcj, new Path(args[1]));

    Path outputpath = new Path(args[1]);
    // delete the output folder if exists
    outputpath.getFileSystem(conf).delete(outputpath, true);

    // to execute the job and return the status
    return mwcj.waitForCompletion(true) ? 0 : -1;

}

public static void main(String[] args) throws Exception {
    int status = ToolRunner.run(new Configuration(),
            new MultiWordCountJob(), args);

    System.out.println("My Status: " + status);
}

}

Upvotes: 0

Views: 703

Answers (2)

vefthym
vefthym

Reputation: 7462

Use MultipleOutputs, instead of writing directly to a file, and then use the context.write() method as usual to pass key-value pairs to the reducer.

Of course, as siddhartha jain says, you cannot have a reduce phase, if you specify numReduceTasks as 0. In that case, the job ends at the map phase.

Quoting MultipleOutputs:

The MultipleOutputs class simplifies writting to additional outputs other than the job default output via the OutputCollector passed to the map() and reduce() methods of the Mapper and Reducer implementations.
...
When named outputs are used within a Mapper implementation, key/values written to a name output are not part of the reduce phase, only key/values written to the job OutputCollector are part of the reduce phase.

For handling each input file individually, see my answer in your related post.

Upvotes: 0

siddhartha jain
siddhartha jain

Reputation: 1006

In your driver class, no of reducers set by you is 0 -->

// setting no of reducers

mwcj.setNumReduceTasks(0);

make it greater than 0 to whatever value you want.Then reducer will work.

Upvotes: 3

Related Questions