Reputation: 31
I have 4 files in a folder and the folder location is my input path argument. I need to find word count of each file individually and should write to the file with same name as input file.
I have written mapper class which gives the output correctly to specified file. But, that is not being processed by reducer. What i did wrong is - I didnt use 'context' while writing output of mapper, so empty is passed to reducer and blank output produced. But, mapper executed as desired and kept the files at the correct location with expected file names. I want shuffle and sort & reducer to work on these files / those files to passed to reducer. Please correct me. Thanks.
Mapper
package com.oracle.hadoop.multiwordcount;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class MultiWordCountMapper extends
Mapper<LongWritable, Text, Text, LongWritable> {
protected String filenamekey;
private RecordWriter<Text, LongWritable> writer;
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// Read the line
String line = value.toString();
// Split the line into words
String[] words = line.split(" ");
// Assign count(1) to each word
for (String word : words) {
writer.write(new Text(word), new LongWritable(1));
}
}
protected void setup(Context context) throws IOException,
InterruptedException {
InputSplit split = context.getInputSplit();
Path path = ((FileSplit) split).getPath();
// extract parent folder and filename
filenamekey = path.getParent().getName() + "/" + path.getName();
// base output folder
final Path baseOutputPath = FileOutputFormat.getOutputPath(context);
// output file name
final Path outputFilePath = new Path(baseOutputPath, filenamekey);
// We need to override the getDefaultWorkFile path to stop the file
// being created in the _temporary/taskid folder
TextOutputFormat<Text, LongWritable> tof = new TextOutputFormat<Text, LongWritable>() {
@Override
public Path getDefaultWorkFile(TaskAttemptContext context,
String extension) throws IOException {
return outputFilePath;
}
};
// create a record writer that will write to the desired output
// subfolder
writer = tof.getRecordWriter(context);
}
protected void cleanup(Context context) throws IOException,
InterruptedException {
writer.close(context);
};
}
Reducer
package com.oracle.hadoop.multiwordcount;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
public class MultiWordCountReducer extends
Reducer<Text, LongWritable, Text, LongWritable> {
/*
* private MultipleOutputs multiouputs;
*
* protected void setup(Context context) throws java.io.IOException
* ,InterruptedException { multiouputs = new MultipleOutputs(context);
*
* }
*/
@Override
protected void reduce(Text key, Iterable<LongWritable> values,
Context context) throws java.io.IOException, InterruptedException {
// Sum the List of values
long sum = 0;
for (LongWritable value : values) {
sum = sum + value.get();
}
// Assign Sum to corresponding Word
context.write(key, new LongWritable(sum));
}
/*
* protected void cleanup(Context context) throws java.io.IOException
* ,InterruptedException { multiouputs.close(); };
*/
}
Driver
package com.oracle.hadoop.multiwordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MultiWordCountJob implements Tool {
private Configuration conf;
@Override
public Configuration getConf() {
return conf;
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public int run(String[] args) throws Exception {
@SuppressWarnings("deprecation")
Job mwcj = new Job(getConf());
// setting the job name
mwcj.setJobName("Multiple file WordCount Job");
// to call this as a jar
mwcj.setJarByClass(this.getClass());
// setting custom mapper class
mwcj.setMapperClass(MultiWordCountMapper.class);
// setting custom reducer class
mwcj.setReducerClass(MultiWordCountReducer.class);
// setting no of reducers
// mwcj.setNumReduceTasks(0);
// setting custom partitioner class
// mwcj.setPartitionerClass(WordCountPartitioner.class);
// setting mapper output key class: K2
mwcj.setMapOutputKeyClass(Text.class);
// setting mapper output value class: V2
mwcj.setMapOutputValueClass(LongWritable.class);
// setting reducer output key class: K3
mwcj.setOutputKeyClass(Text.class);
// setting reducer output value class: V3
mwcj.setOutputValueClass(LongWritable.class);
// setting the input format class ,i.e for K1, V1
mwcj.setInputFormatClass(TextInputFormat.class);
// setting the output format class
LazyOutputFormat.setOutputFormatClass(mwcj, TextOutputFormat.class);
// mwcj.setOutputFormatClass(TextOutputFormat.class);
// setting the input file path
FileInputFormat.addInputPath(mwcj, new Path(args[0]));
// setting the output folder path
FileOutputFormat.setOutputPath(mwcj, new Path(args[1]));
Path outputpath = new Path(args[1]);
// delete the output folder if exists
outputpath.getFileSystem(conf).delete(outputpath, true);
// to execute the job and return the status
return mwcj.waitForCompletion(true) ? 0 : -1;
}
public static void main(String[] args) throws Exception {
int status = ToolRunner.run(new Configuration(),
new MultiWordCountJob(), args);
System.out.println("My Status: " + status);
}
}
Upvotes: 0
Views: 703
Reputation: 7462
Use MultipleOutputs, instead of writing directly to a file, and then use the context.write() method as usual to pass key-value pairs to the reducer.
Of course, as siddhartha jain says, you cannot have a reduce phase, if you specify numReduceTasks as 0. In that case, the job ends at the map phase.
Quoting MultipleOutputs:
The MultipleOutputs class simplifies writting to additional outputs other than the job default output via the OutputCollector passed to the map() and reduce() methods of the Mapper and Reducer implementations.
...
When named outputs are used within a Mapper implementation, key/values written to a name output are not part of the reduce phase, only key/values written to the job OutputCollector are part of the reduce phase.
For handling each input file individually, see my answer in your related post.
Upvotes: 0
Reputation: 1006
In your driver class, no of reducers set by you is 0 -->
// setting no of reducers
mwcj.setNumReduceTasks(0);
make it greater than 0 to whatever value you want.Then reducer will work.
Upvotes: 3