Reputation: 3811
I'm a newbie in Hadoop. I'm trying out the Wordcount program.
Now to try out multiple output files, i use MultipleOutputFormat
. this link helped me in doing it.
in my driver class i had
MultipleOutputs.addNamedOutput(conf, "even",
org.apache.hadoop.mapred.TextOutputFormat.class, Text.class,
MultipleOutputs.addNamedOutput(conf, "odd",
org.apache.hadoop.mapred.TextOutputFormat.class, Text.class,
and my reduce class became this
public static class Reduce extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable> {
MultipleOutputs mos = null;
public void configure(JobConf job) {
mos = new MultipleOutputs(job);
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int sum = 0;
while (values.hasNext()) {
sum +=;
if (sum % 2 == 0) {
mos.getCollector("even", reporter).collect(key, new IntWritable(sum));
}else {
mos.getCollector("odd", reporter).collect(key, new IntWritable(sum));
//output.collect(key, new IntWritable(sum));
public void close() throws IOException {
// TODO Auto-generated method stub
Things worked , but i get LOT of files, (one odd and one even for every map-reduce)
Question is : How can i have just 2 output files (odd & even) so that every odd output of every map-reduce gets written into that odd file, and same for even.
Upvotes: 16
Views: 8870
Reputation: 1234
you may try to change the output file name (Reducer output), since HDFS supports append operations only, then it will collect all Temp-r-0000x files (partitions) from all reducers and put them together in one file.
here the class you need to create which overrides methods in TextOutputFormat:
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class CustomNameMultipleFileOutputFormat<K, V> extends TextOutputFormat<K, V> {
private String folderName;
private class MultipleFilesRecordWriter extends RecordWriter<K, V> {
private Map<String, RecordWriter<K, V>> fileNameToWriter;
private FolderNameExtractor<K, V> fileNameExtractor;
private TaskAttemptContext job;
public MultipleFilesRecordWriter(FolderNameExtractor<K, V> fileNameExtractor, TaskAttemptContext job) {
fileNameToWriter = new HashMap<String, RecordWriter<K, V>>();
this.fileNameExtractor = fileNameExtractor;
this.job = job;
public void write(K key, V value) throws IOException, InterruptedException {
String fileName = "**[FOLDER_NAME_INCLUDING_SUB_DIRS]**";//fileNameExtractor.extractFolderName(key, value);
RecordWriter<K, V> writer = fileNameToWriter.get(fileName);
if (writer == null) {
writer = createNewWriter(fileName, fileNameToWriter, job);
if (writer == null) {
throw new IOException("Unable to create writer for path: " + fileName);
writer.write(key, value);
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
for (Entry<String, RecordWriter<K, V>> entry : fileNameToWriter.entrySet()) {
private synchronized RecordWriter<K, V> createNewWriter(String folderName,
Map<String, RecordWriter<K, V>> fileNameToWriter, TaskAttemptContext job) {
try {
this.folderName = folderName;
RecordWriter<K, V> writer = super.getRecordWriter(job);
this.folderName = null;
fileNameToWriter.put(folderName, writer);
return writer;
} catch (Exception e) {
return null;
public Path getDefaultWorkFile(TaskAttemptContext context, String extension) throws IOException {
Path path = super.getDefaultWorkFile(context, extension);
if (folderName != null) {
String newPath = path.getParent().toString() + "/" + folderName + "/**[ONE_FILE_NAME]**";
path = new Path(newPath);
return path;
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
return new MultipleFilesRecordWriter(getFolderNameExtractor(), job);
public FolderNameExtractor<K, V> getFolderNameExtractor() {
return new KeyFolderNameExtractor<K, V>();
public interface FolderNameExtractor<K, V> {
public String extractFolderName(K key, V value);
private static class KeyFolderNameExtractor<K, V> implements FolderNameExtractor<K, V> {
public String extractFolderName(K key, V value) {
return key.toString();
then Reducer/Mapper:
public static class ExtraLabReducer extends Reducer<CustomKeyComparable, Text, CustomKeyComparable, Text>
MultipleOutputs multipleOutputs;
protected void setup(Context context) throws IOException, InterruptedException {
multipleOutputs = new MultipleOutputs(context);
public void reduce(CustomKeyComparable key, Iterable<Text> values, Context context) throws IOException, InterruptedException
for(Text d : values)
**multipleOutputs.write**("batta",key, d,**"[EXAMPLE_FILE_NAME]"**);
protected void cleanup(Context context) throws IOException, InterruptedException {
then in job config:
Job job = new Job(getConf(), "ExtraLab");
FileInputFormat.addInputPath(job, new Path(args[0]));
//adding one more reducer
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
MultipleOutputs.addNamedOutput(job,"batta", CustomNameMultipleFileOutputFormat.class,CustomKeyComparable.class,Text.class);
Upvotes: 0
Reputation: 41
I wrote a class for doing this. Just use it your job:
This is the my class:
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
* TextOutputFormat extension which enables writing the mapper/reducer's output in multiple files.<br>
* <p>
* <b>WARNING</b>: The number of different folder shuoldn't be large for one mapper since we keep an
* {@link RecordWriter} instance per folder name.
* </p>
* <p>
* In this class the folder name is defined by the written entry's key.<br>
* To change this behavior simply extend this class and override the
* {@link HdMultipleFileOutputFormat#getFolderNameExtractor()} method and create your own
* {@link FolderNameExtractor} implementation.
* </p>
* @author ykesten
* @param <K> - Keys type
* @param <V> - Values type
public class HdMultipleFileOutputFormat<K, V> extends TextOutputFormat<K, V> {
private String folderName;
private class MultipleFilesRecordWriter extends RecordWriter<K, V> {
private Map<String, RecordWriter<K, V>> fileNameToWriter;
private FolderNameExtractor<K, V> fileNameExtractor;
private TaskAttemptContext job;
public MultipleFilesRecordWriter(FolderNameExtractor<K, V> fileNameExtractor, TaskAttemptContext job) {
fileNameToWriter = new HashMap<String, RecordWriter<K, V>>();
this.fileNameExtractor = fileNameExtractor;
this.job = job;
public void write(K key, V value) throws IOException, InterruptedException {
String fileName = fileNameExtractor.extractFolderName(key, value);
RecordWriter<K, V> writer = fileNameToWriter.get(fileName);
if (writer == null) {
writer = createNewWriter(fileName, fileNameToWriter, job);
if (writer == null) {
throw new IOException("Unable to create writer for path: " + fileName);
writer.write(key, value);
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
for (Entry<String, RecordWriter<K, V>> entry : fileNameToWriter.entrySet()) {
private synchronized RecordWriter<K, V> createNewWriter(String folderName,
Map<String, RecordWriter<K, V>> fileNameToWriter, TaskAttemptContext job) {
try {
this.folderName = folderName;
RecordWriter<K, V> writer = super.getRecordWriter(job);
this.folderName = null;
fileNameToWriter.put(folderName, writer);
return writer;
} catch (Exception e) {
return null;
public Path getDefaultWorkFile(TaskAttemptContext context, String extension) throws IOException {
Path path = super.getDefaultWorkFile(context, extension);
if (folderName != null) {
String newPath = path.getParent().toString() + "/" + folderName + "/" + path.getName();
path = new Path(newPath);
return path;
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
return new MultipleFilesRecordWriter(getFolderNameExtractor(), job);
public FolderNameExtractor<K, V> getFolderNameExtractor() {
return new KeyFolderNameExtractor<K, V>();
public interface FolderNameExtractor<K, V> {
public String extractFolderName(K key, V value);
private static class KeyFolderNameExtractor<K, V> implements FolderNameExtractor<K, V> {
public String extractFolderName(K key, V value) {
return key.toString();
Upvotes: 3
Reputation: 2830
Multiple Output files will be generated based on number of reducers.
You can use hadoop dfs -getmerge to merged outputs
Upvotes: 1
Reputation: 12883
Each reducer uses an OutputFormat to write records to. So that's why you are getting a set of odd and even files per reducer. This is by design so that each reducer can perform writes in parallel.
If you want just a single odd and single even file, you'll need to set mapred.reduce.tasks to 1. But performance will suffer, because all the mappers will be feeding into a single reducer.
Another option is to change the process the reads these files to accept multiple input files, or write a separate process that merges these files together.
Upvotes: 3