Reputation: 1695
My input folder has 200 files. I want MultipleOutputs to write my parsed inputs from each file(identified using "map.input.file") into an output file with the same name.Since, I don't have any aggregation to perform and so using 0 reducer option (conf.setNumReduceTasks(0)). Ideally, I should get 200 output files.
But, my output has around 5000+ files - each file containing only one line (of streaming output).Clearly, it is not aggregating.My assumption is, Ideally, in zero reducer - mapper output should be aggregated .
Help is appreciated. Thanks!
public static void main(String[] args) throws IOException {
if (args.length != 2) {
System.err.println("Usage: MaxTemperature <input path> <output path>");
System.exit(-1);
}
JobConf conf = new JobConf(MultipleOutputEx.class);
conf.setJobName("Duration Count");
FileInputFormat.addInputPath(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.setNumReduceTasks(0);
conf.setMapperClass(MultipleOutputExMapper.class);
conf.setReducerClass(MultipleOutputExReducer.class);
conf.setMapOutputKeyClass(NullWritable.class);
MultipleOutputs.addMultiNamedOutput(conf,"mofiles", TextOutputFormat.class, NullWritable.class, Text.class);
JobClient.runJob(conf);
}
And my Mapper class is,
public class MultipleOutputExMapper extends MapReduceBase implements
Mapper<LongWritable, Text, NullWritable, Text> {
MultipleOutputs mos = null;
Text fileKey = new Text();
String line = "";
private JobConf conf;
@Override
public void configure(JobConf conf) {
this.conf = conf;
mos = new MultipleOutputs(conf);
}
public void map(LongWritable key, Text value,
OutputCollector<NullWritable, Text> output, Reporter reporter)
throws IOException {
try {
String filename = conf.get("map.input.file");
fileKey.set(filename);
OutputCollector<NullWritable, Text> collector = mos.getCollector(
"mofiles", key.toString(), reporter);
collector.collect(NullWritable.get(), value);
} catch (ArrayIndexOutOfBoundsException E) {
E.printStackTrace();
} catch (Exception E) {
System.out.println(line);
E.printStackTrace();
}
}
@Override
public void close() throws IOException {
mos.close();
}
Upvotes: 1
Views: 317
Reputation: 30089
You're creating an output file for each unique key (as suggested by @climbage in his comment). Try amending to this (untested and uncompiled):
protected OutputCollector<NullWritable, Text> collector = null;
protected String filename = null;
@Override
public void configure(JobConf conf) {
this.conf = conf;
mos = new MultipleOutputs(conf);
// get the filename (just the name, not the path)
filename = new Path(conf.get("map.input.file")).getName();
}
public void map(LongWritable key, Text value,
OutputCollector<NullWritable, Text> output, Reporter reporter)
throws IOException {
try {
if (collector == null) {
// create an output collector for the file
collector = mos.getCollector("mofiles", filename, reporter);
}
collector.collect(NullWritable.get(), value);
} catch (ArrayIndexOutOfBoundsException E) {
E.printStackTrace();
} catch (Exception E) {
System.out.println(line);
E.printStackTrace();
}
}
@Override
public void close() throws IOException {
mos.close();
}
Upvotes: 2